Commit 6f789a6a authored by Oleg Drokin's avatar Oleg Drokin Committed by Greg Kroah-Hartman

staging/lustre/ldlm: Adjust comments to better conform to coding style

This patch fixes "Block comments use a trailing */ on a separate line"
warnings from checkpatch.
Signed-off-by: default avatarOleg Drokin <green@linuxhacker.ru>
Signed-off-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
parent ec9a1ac2
...@@ -133,7 +133,8 @@ static void __rotate_change_maxhigh(struct interval_node *node, ...@@ -133,7 +133,8 @@ static void __rotate_change_maxhigh(struct interval_node *node,
/* The left rotation "pivots" around the link from node to node->right, and /* The left rotation "pivots" around the link from node to node->right, and
* - node will be linked to node->right's left child, and * - node will be linked to node->right's left child, and
* - node->right's left child will be linked to node's right child. */ * - node->right's left child will be linked to node's right child.
*/
static void __rotate_left(struct interval_node *node, static void __rotate_left(struct interval_node *node,
struct interval_node **root) struct interval_node **root)
{ {
...@@ -162,7 +163,8 @@ static void __rotate_left(struct interval_node *node, ...@@ -162,7 +163,8 @@ static void __rotate_left(struct interval_node *node,
/* The right rotation "pivots" around the link from node to node->left, and /* The right rotation "pivots" around the link from node to node->left, and
* - node will be linked to node->left's right child, and * - node will be linked to node->left's right child, and
* - node->left's right child will be linked to node's left child. */ * - node->left's right child will be linked to node's left child.
*/
static void __rotate_right(struct interval_node *node, static void __rotate_right(struct interval_node *node,
struct interval_node **root) struct interval_node **root)
{ {
......
...@@ -62,7 +62,8 @@ ...@@ -62,7 +62,8 @@
* is the "highest lock". This function returns the new KMS value. * is the "highest lock". This function returns the new KMS value.
* Caller must hold lr_lock already. * Caller must hold lr_lock already.
* *
* NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */ * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes!
*/
__u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms) __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
{ {
struct ldlm_resource *res = lock->l_resource; struct ldlm_resource *res = lock->l_resource;
...@@ -72,7 +73,8 @@ __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms) ...@@ -72,7 +73,8 @@ __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
/* don't let another thread in ldlm_extent_shift_kms race in /* don't let another thread in ldlm_extent_shift_kms race in
* just after we finish and take our lock into account in its * just after we finish and take our lock into account in its
* calculation of the kms */ * calculation of the kms
*/
lock->l_flags |= LDLM_FL_KMS_IGNORE; lock->l_flags |= LDLM_FL_KMS_IGNORE;
list_for_each(tmp, &res->lr_granted) { list_for_each(tmp, &res->lr_granted) {
...@@ -85,7 +87,8 @@ __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms) ...@@ -85,7 +87,8 @@ __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
return old_kms; return old_kms;
/* This extent _has_ to be smaller than old_kms (checked above) /* This extent _has_ to be smaller than old_kms (checked above)
* so kms can only ever be smaller or the same as old_kms. */ * so kms can only ever be smaller or the same as old_kms.
*/
if (lck->l_policy_data.l_extent.end + 1 > kms) if (lck->l_policy_data.l_extent.end + 1 > kms)
kms = lck->l_policy_data.l_extent.end + 1; kms = lck->l_policy_data.l_extent.end + 1;
} }
...@@ -191,7 +194,8 @@ void ldlm_extent_add_lock(struct ldlm_resource *res, ...@@ -191,7 +194,8 @@ void ldlm_extent_add_lock(struct ldlm_resource *res,
res->lr_itree[idx].lit_size++; res->lr_itree[idx].lit_size++;
/* even though we use interval tree to manage the extent lock, we also /* even though we use interval tree to manage the extent lock, we also
* add the locks into grant list, for debug purpose, .. */ * add the locks into grant list, for debug purpose, ..
*/
ldlm_resource_add_lock(res, &res->lr_granted, lock); ldlm_resource_add_lock(res, &res->lr_granted, lock);
} }
......
...@@ -107,7 +107,8 @@ ldlm_flock_destroy(struct ldlm_lock *lock, enum ldlm_mode mode, __u64 flags) ...@@ -107,7 +107,8 @@ ldlm_flock_destroy(struct ldlm_lock *lock, enum ldlm_mode mode, __u64 flags)
lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING; lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
/* when reaching here, it is under lock_res_and_lock(). Thus, /* when reaching here, it is under lock_res_and_lock(). Thus,
need call the nolock version of ldlm_lock_decref_internal*/ * need call the nolock version of ldlm_lock_decref_internal
*/
ldlm_lock_decref_internal_nolock(lock, mode); ldlm_lock_decref_internal_nolock(lock, mode);
} }
...@@ -159,13 +160,15 @@ static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, ...@@ -159,13 +160,15 @@ static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
*err = ELDLM_OK; *err = ELDLM_OK;
/* No blocking ASTs are sent to the clients for /* No blocking ASTs are sent to the clients for
* Posix file & record locks */ * Posix file & record locks
*/
req->l_blocking_ast = NULL; req->l_blocking_ast = NULL;
reprocess: reprocess:
if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) { if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
/* This loop determines where this processes locks start /* This loop determines where this processes locks start
* in the resource lr_granted list. */ * in the resource lr_granted list.
*/
list_for_each(tmp, &res->lr_granted) { list_for_each(tmp, &res->lr_granted) {
lock = list_entry(tmp, struct ldlm_lock, lock = list_entry(tmp, struct ldlm_lock,
l_res_link); l_res_link);
...@@ -180,7 +183,8 @@ static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, ...@@ -180,7 +183,8 @@ static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
lockmode_verify(mode); lockmode_verify(mode);
/* This loop determines if there are existing locks /* This loop determines if there are existing locks
* that conflict with the new lock request. */ * that conflict with the new lock request.
*/
list_for_each(tmp, &res->lr_granted) { list_for_each(tmp, &res->lr_granted) {
lock = list_entry(tmp, struct ldlm_lock, lock = list_entry(tmp, struct ldlm_lock,
l_res_link); l_res_link);
...@@ -238,8 +242,8 @@ static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, ...@@ -238,8 +242,8 @@ static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
} }
/* Scan the locks owned by this process that overlap this request. /* Scan the locks owned by this process that overlap this request.
* We may have to merge or split existing locks. */ * We may have to merge or split existing locks.
*/
if (!ownlocks) if (!ownlocks)
ownlocks = &res->lr_granted; ownlocks = &res->lr_granted;
...@@ -253,7 +257,8 @@ static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, ...@@ -253,7 +257,8 @@ static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
/* If the modes are the same then we need to process /* If the modes are the same then we need to process
* locks that overlap OR adjoin the new lock. The extra * locks that overlap OR adjoin the new lock. The extra
* logic condition is necessary to deal with arithmetic * logic condition is necessary to deal with arithmetic
* overflow and underflow. */ * overflow and underflow.
*/
if ((new->l_policy_data.l_flock.start > if ((new->l_policy_data.l_flock.start >
(lock->l_policy_data.l_flock.end + 1)) (lock->l_policy_data.l_flock.end + 1))
&& (lock->l_policy_data.l_flock.end != && (lock->l_policy_data.l_flock.end !=
...@@ -327,11 +332,13 @@ static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, ...@@ -327,11 +332,13 @@ static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
* with the request but this would complicate the reply * with the request but this would complicate the reply
* processing since updates to req get reflected in the * processing since updates to req get reflected in the
* reply. The client side replays the lock request so * reply. The client side replays the lock request so
* it must see the original lock data in the reply. */ * it must see the original lock data in the reply.
*/
/* XXX - if ldlm_lock_new() can sleep we should /* XXX - if ldlm_lock_new() can sleep we should
* release the lr_lock, allocate the new lock, * release the lr_lock, allocate the new lock,
* and restart processing this lock. */ * and restart processing this lock.
*/
if (!new2) { if (!new2) {
unlock_res_and_lock(req); unlock_res_and_lock(req);
new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK, new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
...@@ -396,7 +403,8 @@ static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, ...@@ -396,7 +403,8 @@ static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
if (*flags != LDLM_FL_WAIT_NOREPROC) { if (*flags != LDLM_FL_WAIT_NOREPROC) {
/* The only one possible case for client-side calls flock /* The only one possible case for client-side calls flock
* policy function is ldlm_flock_completion_ast inside which * policy function is ldlm_flock_completion_ast inside which
* carries LDLM_FL_WAIT_NOREPROC flag. */ * carries LDLM_FL_WAIT_NOREPROC flag.
*/
CERROR("Illegal parameter for client-side-only module.\n"); CERROR("Illegal parameter for client-side-only module.\n");
LBUG(); LBUG();
} }
...@@ -404,7 +412,8 @@ static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, ...@@ -404,7 +412,8 @@ static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
/* In case we're reprocessing the requested lock we can't destroy /* In case we're reprocessing the requested lock we can't destroy
* it until after calling ldlm_add_ast_work_item() above so that laawi() * it until after calling ldlm_add_ast_work_item() above so that laawi()
* can bump the reference count on \a req. Otherwise \a req * can bump the reference count on \a req. Otherwise \a req
* could be freed before the completion AST can be sent. */ * could be freed before the completion AST can be sent.
*/
if (added) if (added)
ldlm_flock_destroy(req, mode, *flags); ldlm_flock_destroy(req, mode, *flags);
...@@ -458,7 +467,8 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) ...@@ -458,7 +467,8 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
/* Import invalidation. We need to actually release the lock /* Import invalidation. We need to actually release the lock
* references being held, so that it can go away. No point in * references being held, so that it can go away. No point in
* holding the lock even if app still believes it has it, since * holding the lock even if app still believes it has it, since
* server already dropped it anyway. Only for granted locks too. */ * server already dropped it anyway. Only for granted locks too.
*/
if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) == if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
(LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) { (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
if (lock->l_req_mode == lock->l_granted_mode && if (lock->l_req_mode == lock->l_granted_mode &&
...@@ -539,7 +549,8 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) ...@@ -539,7 +549,8 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
} else if (flags & LDLM_FL_TEST_LOCK) { } else if (flags & LDLM_FL_TEST_LOCK) {
/* fcntl(F_GETLK) request */ /* fcntl(F_GETLK) request */
/* The old mode was saved in getlk->fl_type so that if the mode /* The old mode was saved in getlk->fl_type so that if the mode
* in the lock changes we can decref the appropriate refcount.*/ * in the lock changes we can decref the appropriate refcount.
*/
ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC); ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
switch (lock->l_granted_mode) { switch (lock->l_granted_mode) {
case LCK_PR: case LCK_PR:
...@@ -558,7 +569,8 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) ...@@ -558,7 +569,8 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
__u64 noreproc = LDLM_FL_WAIT_NOREPROC; __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
/* We need to reprocess the lock to do merges or splits /* We need to reprocess the lock to do merges or splits
* with existing locks owned by this process. */ * with existing locks owned by this process.
*/
ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL); ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
} }
unlock_res_and_lock(lock); unlock_res_and_lock(lock);
...@@ -575,7 +587,8 @@ void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy, ...@@ -575,7 +587,8 @@ void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid; lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
/* Compat code, old clients had no idea about owner field and /* Compat code, old clients had no idea about owner field and
* relied solely on pid for ownership. Introduced in LU-104, 2.1, * relied solely on pid for ownership. Introduced in LU-104, 2.1,
* April 2011 */ * April 2011
*/
lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid; lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
} }
......
...@@ -96,7 +96,8 @@ enum { ...@@ -96,7 +96,8 @@ enum {
LDLM_CANCEL_SHRINK = 1 << 2, /* Cancel locks from shrinker. */ LDLM_CANCEL_SHRINK = 1 << 2, /* Cancel locks from shrinker. */
LDLM_CANCEL_LRUR = 1 << 3, /* Cancel locks from lru resize. */ LDLM_CANCEL_LRUR = 1 << 3, /* Cancel locks from lru resize. */
LDLM_CANCEL_NO_WAIT = 1 << 4 /* Cancel locks w/o blocking (neither LDLM_CANCEL_NO_WAIT = 1 << 4 /* Cancel locks w/o blocking (neither
* sending nor waiting for any rpcs) */ * sending nor waiting for any rpcs)
*/
}; };
int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
......
...@@ -219,7 +219,8 @@ EXPORT_SYMBOL(client_import_find_conn); ...@@ -219,7 +219,8 @@ EXPORT_SYMBOL(client_import_find_conn);
void client_destroy_import(struct obd_import *imp) void client_destroy_import(struct obd_import *imp)
{ {
/* Drop security policy instance after all RPCs have finished/aborted /* Drop security policy instance after all RPCs have finished/aborted
* to let all busy contexts be released. */ * to let all busy contexts be released.
*/
class_import_get(imp); class_import_get(imp);
class_destroy_import(imp); class_destroy_import(imp);
sptlrpc_import_sec_put(imp); sptlrpc_import_sec_put(imp);
...@@ -245,7 +246,8 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg) ...@@ -245,7 +246,8 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
int rc; int rc;
/* In a more perfect world, we would hang a ptlrpc_client off of /* In a more perfect world, we would hang a ptlrpc_client off of
* obd_type and just use the values from there. */ * obd_type and just use the values from there.
*/
if (!strcmp(name, LUSTRE_OSC_NAME)) { if (!strcmp(name, LUSTRE_OSC_NAME)) {
rq_portal = OST_REQUEST_PORTAL; rq_portal = OST_REQUEST_PORTAL;
rp_portal = OSC_REPLY_PORTAL; rp_portal = OSC_REPLY_PORTAL;
...@@ -348,7 +350,8 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg) ...@@ -348,7 +350,8 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
/* This value may be reduced at connect time in /* This value may be reduced at connect time in
* ptlrpc_connect_interpret() . We initialize it to only * ptlrpc_connect_interpret() . We initialize it to only
* 1MB until we know what the performance looks like. * 1MB until we know what the performance looks like.
* In the future this should likely be increased. LU-1431 */ * In the future this should likely be increased. LU-1431
*/
cli->cl_max_pages_per_rpc = min_t(int, PTLRPC_MAX_BRW_PAGES, cli->cl_max_pages_per_rpc = min_t(int, PTLRPC_MAX_BRW_PAGES,
LNET_MTU >> PAGE_CACHE_SHIFT); LNET_MTU >> PAGE_CACHE_SHIFT);
...@@ -545,14 +548,16 @@ int client_disconnect_export(struct obd_export *exp) ...@@ -545,14 +548,16 @@ int client_disconnect_export(struct obd_export *exp)
/* Mark import deactivated now, so we don't try to reconnect if any /* Mark import deactivated now, so we don't try to reconnect if any
* of the cleanup RPCs fails (e.g. LDLM cancel, etc). We don't * of the cleanup RPCs fails (e.g. LDLM cancel, etc). We don't
* fully deactivate the import, or that would drop all requests. */ * fully deactivate the import, or that would drop all requests.
*/
spin_lock(&imp->imp_lock); spin_lock(&imp->imp_lock);
imp->imp_deactive = 1; imp->imp_deactive = 1;
spin_unlock(&imp->imp_lock); spin_unlock(&imp->imp_lock);
/* Some non-replayable imports (MDS's OSCs) are pinged, so just /* Some non-replayable imports (MDS's OSCs) are pinged, so just
* delete it regardless. (It's safe to delete an import that was * delete it regardless. (It's safe to delete an import that was
* never added.) */ * never added.)
*/
(void)ptlrpc_pinger_del_import(imp); (void)ptlrpc_pinger_del_import(imp);
if (obd->obd_namespace) { if (obd->obd_namespace) {
...@@ -564,7 +569,8 @@ int client_disconnect_export(struct obd_export *exp) ...@@ -564,7 +569,8 @@ int client_disconnect_export(struct obd_export *exp)
} }
/* There's no need to hold sem while disconnecting an import, /* There's no need to hold sem while disconnecting an import,
* and it may actually cause deadlock in GSS. */ * and it may actually cause deadlock in GSS.
*/
up_write(&cli->cl_sem); up_write(&cli->cl_sem);
rc = ptlrpc_disconnect_import(imp, 0); rc = ptlrpc_disconnect_import(imp, 0);
down_write(&cli->cl_sem); down_write(&cli->cl_sem);
...@@ -573,7 +579,8 @@ int client_disconnect_export(struct obd_export *exp) ...@@ -573,7 +579,8 @@ int client_disconnect_export(struct obd_export *exp)
out_disconnect: out_disconnect:
/* Use server style - class_disconnect should be always called for /* Use server style - class_disconnect should be always called for
* o_disconnect. */ * o_disconnect.
*/
err = class_disconnect(exp); err = class_disconnect(exp);
if (!rc && err) if (!rc && err)
rc = err; rc = err;
...@@ -592,7 +599,8 @@ int target_pack_pool_reply(struct ptlrpc_request *req) ...@@ -592,7 +599,8 @@ int target_pack_pool_reply(struct ptlrpc_request *req)
struct obd_device *obd; struct obd_device *obd;
/* Check that we still have all structures alive as this may /* Check that we still have all structures alive as this may
* be some late RPC at shutdown time. */ * be some late RPC at shutdown time.
*/
if (unlikely(!req->rq_export || !req->rq_export->exp_obd || if (unlikely(!req->rq_export || !req->rq_export->exp_obd ||
!exp_connect_lru_resize(req->rq_export))) { !exp_connect_lru_resize(req->rq_export))) {
lustre_msg_set_slv(req->rq_repmsg, 0); lustre_msg_set_slv(req->rq_repmsg, 0);
...@@ -697,7 +705,8 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) ...@@ -697,7 +705,8 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
* reply ref until ptlrpc_handle_rs() is done * reply ref until ptlrpc_handle_rs() is done
* with the reply state (if the send was successful, there * with the reply state (if the send was successful, there
* would have been +1 ref for the net, which * would have been +1 ref for the net, which
* reply_out_callback leaves alone) */ * reply_out_callback leaves alone)
*/
rs->rs_on_net = 0; rs->rs_on_net = 0;
ptlrpc_rs_addref(rs); ptlrpc_rs_addref(rs);
} }
......
...@@ -326,9 +326,11 @@ static int ldlm_lock_destroy_internal(struct ldlm_lock *lock) ...@@ -326,9 +326,11 @@ static int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
if (lock->l_export && lock->l_export->exp_lock_hash) { if (lock->l_export && lock->l_export->exp_lock_hash) {
/* NB: it's safe to call cfs_hash_del() even lock isn't /* NB: it's safe to call cfs_hash_del() even lock isn't
* in exp_lock_hash. */ * in exp_lock_hash.
*/
/* In the function below, .hs_keycmp resolves to /* In the function below, .hs_keycmp resolves to
* ldlm_export_lock_keycmp() */ * ldlm_export_lock_keycmp()
*/
/* coverity[overrun-buffer-val] */ /* coverity[overrun-buffer-val] */
cfs_hash_del(lock->l_export->exp_lock_hash, cfs_hash_del(lock->l_export->exp_lock_hash,
&lock->l_remote_handle, &lock->l_exp_hash); &lock->l_remote_handle, &lock->l_exp_hash);
...@@ -540,7 +542,8 @@ struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle, ...@@ -540,7 +542,8 @@ struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
return NULL; return NULL;
/* It's unlikely but possible that someone marked the lock as /* It's unlikely but possible that someone marked the lock as
* destroyed after we did handle2object on it */ * destroyed after we did handle2object on it
*/
if (flags == 0 && ((lock->l_flags & LDLM_FL_DESTROYED) == 0)) { if (flags == 0 && ((lock->l_flags & LDLM_FL_DESTROYED) == 0)) {
lu_ref_add(&lock->l_reference, "handle", current); lu_ref_add(&lock->l_reference, "handle", current);
return lock; return lock;
...@@ -600,7 +603,8 @@ static void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, ...@@ -600,7 +603,8 @@ static void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
LDLM_DEBUG(lock, "lock incompatible; sending blocking AST."); LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
lock->l_flags |= LDLM_FL_AST_SENT; lock->l_flags |= LDLM_FL_AST_SENT;
/* If the enqueuing client said so, tell the AST recipient to /* If the enqueuing client said so, tell the AST recipient to
* discard dirty data, rather than writing back. */ * discard dirty data, rather than writing back.
*/
if (new->l_flags & LDLM_FL_AST_DISCARD_DATA) if (new->l_flags & LDLM_FL_AST_DISCARD_DATA)
lock->l_flags |= LDLM_FL_DISCARD_DATA; lock->l_flags |= LDLM_FL_DISCARD_DATA;
LASSERT(list_empty(&lock->l_bl_ast)); LASSERT(list_empty(&lock->l_bl_ast));
...@@ -769,7 +773,8 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) ...@@ -769,7 +773,8 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
if (lock->l_flags & LDLM_FL_LOCAL && if (lock->l_flags & LDLM_FL_LOCAL &&
!lock->l_readers && !lock->l_writers) { !lock->l_readers && !lock->l_writers) {
/* If this is a local lock on a server namespace and this was /* If this is a local lock on a server namespace and this was
* the last reference, cancel the lock. */ * the last reference, cancel the lock.
*/
CDEBUG(D_INFO, "forcing cancel of local lock\n"); CDEBUG(D_INFO, "forcing cancel of local lock\n");
lock->l_flags |= LDLM_FL_CBPENDING; lock->l_flags |= LDLM_FL_CBPENDING;
} }
...@@ -777,7 +782,8 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) ...@@ -777,7 +782,8 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
if (!lock->l_readers && !lock->l_writers && if (!lock->l_readers && !lock->l_writers &&
(lock->l_flags & LDLM_FL_CBPENDING)) { (lock->l_flags & LDLM_FL_CBPENDING)) {
/* If we received a blocked AST and this was the last reference, /* If we received a blocked AST and this was the last reference,
* run the callback. */ * run the callback.
*/
LDLM_DEBUG(lock, "final decref done on cbpending lock"); LDLM_DEBUG(lock, "final decref done on cbpending lock");
...@@ -798,7 +804,8 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) ...@@ -798,7 +804,8 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
LDLM_DEBUG(lock, "add lock into lru list"); LDLM_DEBUG(lock, "add lock into lru list");
/* If this is a client-side namespace and this was the last /* If this is a client-side namespace and this was the last
* reference, put it on the LRU. */ * reference, put it on the LRU.
*/
ldlm_lock_add_to_lru(lock); ldlm_lock_add_to_lru(lock);
unlock_res_and_lock(lock); unlock_res_and_lock(lock);
...@@ -807,7 +814,8 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) ...@@ -807,7 +814,8 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
/* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
* are not supported by the server, otherwise, it is done on * are not supported by the server, otherwise, it is done on
* enqueue. */ * enqueue.
*/
if (!exp_connect_cancelset(lock->l_conn_export) && if (!exp_connect_cancelset(lock->l_conn_export) &&
!ns_connect_lru_resize(ns)) !ns_connect_lru_resize(ns))
ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0); ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
...@@ -910,7 +918,8 @@ static void search_granted_lock(struct list_head *queue, ...@@ -910,7 +918,8 @@ static void search_granted_lock(struct list_head *queue,
if (lock->l_policy_data.l_inodebits.bits == if (lock->l_policy_data.l_inodebits.bits ==
req->l_policy_data.l_inodebits.bits) { req->l_policy_data.l_inodebits.bits) {
/* insert point is last lock of /* insert point is last lock of
* the policy group */ * the policy group
*/
prev->res_link = prev->res_link =
&policy_end->l_res_link; &policy_end->l_res_link;
prev->mode_link = prev->mode_link =
...@@ -931,7 +940,8 @@ static void search_granted_lock(struct list_head *queue, ...@@ -931,7 +940,8 @@ static void search_granted_lock(struct list_head *queue,
} /* loop over policy groups within the mode group */ } /* loop over policy groups within the mode group */
/* insert point is last lock of the mode group, /* insert point is last lock of the mode group,
* new policy group is started */ * new policy group is started
*/
prev->res_link = &mode_end->l_res_link; prev->res_link = &mode_end->l_res_link;
prev->mode_link = &mode_end->l_sl_mode; prev->mode_link = &mode_end->l_sl_mode;
prev->policy_link = &req->l_sl_policy; prev->policy_link = &req->l_sl_policy;
...@@ -943,7 +953,8 @@ static void search_granted_lock(struct list_head *queue, ...@@ -943,7 +953,8 @@ static void search_granted_lock(struct list_head *queue,
} }
/* insert point is last lock on the queue, /* insert point is last lock on the queue,
* new mode group and new policy group are started */ * new mode group and new policy group are started
*/
prev->res_link = queue->prev; prev->res_link = queue->prev;
prev->mode_link = &req->l_sl_mode; prev->mode_link = &req->l_sl_mode;
prev->policy_link = &req->l_sl_policy; prev->policy_link = &req->l_sl_policy;
...@@ -1053,7 +1064,8 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ...@@ -1053,7 +1064,8 @@ static struct ldlm_lock *search_queue(struct list_head *queue,
break; break;
/* Check if this lock can be matched. /* Check if this lock can be matched.
* Used by LU-2919(exclusive open) for open lease lock */ * Used by LU-2919(exclusive open) for open lease lock
*/
if (ldlm_is_excl(lock)) if (ldlm_is_excl(lock))
continue; continue;
...@@ -1062,7 +1074,8 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ...@@ -1062,7 +1074,8 @@ static struct ldlm_lock *search_queue(struct list_head *queue,
* if it passes in CBPENDING and the lock still has users. * if it passes in CBPENDING and the lock still has users.
* this is generally only going to be used by children * this is generally only going to be used by children
* whose parents already hold a lock so forward progress * whose parents already hold a lock so forward progress
* can still happen. */ * can still happen.
*/
if (lock->l_flags & LDLM_FL_CBPENDING && if (lock->l_flags & LDLM_FL_CBPENDING &&
!(flags & LDLM_FL_CBPENDING)) !(flags & LDLM_FL_CBPENDING))
continue; continue;
...@@ -1086,7 +1099,8 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ...@@ -1086,7 +1099,8 @@ static struct ldlm_lock *search_queue(struct list_head *queue,
continue; continue;
/* We match if we have existing lock with same or wider set /* We match if we have existing lock with same or wider set
of bits. */ * of bits.
*/
if (lock->l_resource->lr_type == LDLM_IBITS && if (lock->l_resource->lr_type == LDLM_IBITS &&
((lock->l_policy_data.l_inodebits.bits & ((lock->l_policy_data.l_inodebits.bits &
policy->l_inodebits.bits) != policy->l_inodebits.bits) !=
...@@ -1515,7 +1529,8 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns, ...@@ -1515,7 +1529,8 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns,
if (lock->l_req_mode == lock->l_granted_mode) { if (lock->l_req_mode == lock->l_granted_mode) {
/* The server returned a blocked lock, but it was granted /* The server returned a blocked lock, but it was granted
* before we got a chance to actually enqueue it. We don't * before we got a chance to actually enqueue it. We don't
* need to do anything else. */ * need to do anything else.
*/
*flags &= ~(LDLM_FL_BLOCK_GRANTED | *flags &= ~(LDLM_FL_BLOCK_GRANTED |
LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT); LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
goto out; goto out;
...@@ -1528,7 +1543,8 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns, ...@@ -1528,7 +1543,8 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns,
LBUG(); LBUG();
/* Some flags from the enqueue want to make it into the AST, via the /* Some flags from the enqueue want to make it into the AST, via the
* lock's l_flags. */ * lock's l_flags.
*/
lock->l_flags |= *flags & LDLM_FL_AST_DISCARD_DATA; lock->l_flags |= *flags & LDLM_FL_AST_DISCARD_DATA;
/* /*
...@@ -1609,14 +1625,16 @@ ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) ...@@ -1609,14 +1625,16 @@ ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
* This can't happen with the blocking_ast, however, because we * This can't happen with the blocking_ast, however, because we
* will never call the local blocking_ast until we drop our * will never call the local blocking_ast until we drop our
* reader/writer reference, which we won't do until we get the * reader/writer reference, which we won't do until we get the
* reply and finish enqueueing. */ * reply and finish enqueueing.
*/
/* nobody should touch l_cp_ast */ /* nobody should touch l_cp_ast */
lock_res_and_lock(lock); lock_res_and_lock(lock);
list_del_init(&lock->l_cp_ast); list_del_init(&lock->l_cp_ast);
LASSERT(lock->l_flags & LDLM_FL_CP_REQD); LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
/* save l_completion_ast since it can be changed by /* save l_completion_ast since it can be changed by
* mds_intent_policy(), see bug 14225 */ * mds_intent_policy(), see bug 14225
*/
completion_callback = lock->l_completion_ast; completion_callback = lock->l_completion_ast;
lock->l_flags &= ~LDLM_FL_CP_REQD; lock->l_flags &= ~LDLM_FL_CP_REQD;
unlock_res_and_lock(lock); unlock_res_and_lock(lock);
...@@ -1737,7 +1755,8 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list, ...@@ -1737,7 +1755,8 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
/* We create a ptlrpc request set with flow control extension. /* We create a ptlrpc request set with flow control extension.
* This request set will use the work_ast_lock function to produce new * This request set will use the work_ast_lock function to produce new
* requests and will send a new request each time one completes in order * requests and will send a new request each time one completes in order
* to keep the number of requests in flight to ns_max_parallel_ast */ * to keep the number of requests in flight to ns_max_parallel_ast
*/
arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX, arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
work_ast_lock, arg); work_ast_lock, arg);
if (!arg->set) { if (!arg->set) {
...@@ -1803,7 +1822,8 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) ...@@ -1803,7 +1822,8 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
ns = ldlm_res_to_ns(res); ns = ldlm_res_to_ns(res);
/* Please do not, no matter how tempting, remove this LBUG without /* Please do not, no matter how tempting, remove this LBUG without
* talking to me first. -phik */ * talking to me first. -phik
*/
if (lock->l_readers || lock->l_writers) { if (lock->l_readers || lock->l_writers) {
LDLM_ERROR(lock, "lock still has references"); LDLM_ERROR(lock, "lock still has references");
LBUG(); LBUG();
...@@ -1819,7 +1839,8 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) ...@@ -1819,7 +1839,8 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
ldlm_pool_del(&ns->ns_pool, lock); ldlm_pool_del(&ns->ns_pool, lock);
/* Make sure we will not be called again for same lock what is possible /* Make sure we will not be called again for same lock what is possible
* if not to zero out lock->l_granted_mode */ * if not to zero out lock->l_granted_mode
*/
lock->l_granted_mode = LCK_MINMODE; lock->l_granted_mode = LCK_MINMODE;
unlock_res_and_lock(lock); unlock_res_and_lock(lock);
} }
......
...@@ -194,7 +194,8 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, ...@@ -194,7 +194,8 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
goto out; goto out;
} }
} else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
* variable length */ * variable length
*/
void *lvb_data; void *lvb_data;
lvb_data = kzalloc(lvb_len, GFP_NOFS); lvb_data = kzalloc(lvb_len, GFP_NOFS);
...@@ -224,7 +225,8 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, ...@@ -224,7 +225,8 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
} }
/* If we receive the completion AST before the actual enqueue returned, /* If we receive the completion AST before the actual enqueue returned,
* then we might need to switch lock modes, resources, or extents. */ * then we might need to switch lock modes, resources, or extents.
*/
if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) { if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
lock->l_req_mode = dlm_req->lock_desc.l_granted_mode; lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
LDLM_DEBUG(lock, "completion AST, new lock mode"); LDLM_DEBUG(lock, "completion AST, new lock mode");
...@@ -256,7 +258,8 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, ...@@ -256,7 +258,8 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
if (dlm_req->lock_flags & LDLM_FL_AST_SENT) { if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
/* BL_AST locks are not needed in LRU. /* BL_AST locks are not needed in LRU.
* Let ldlm_cancel_lru() be fast. */ * Let ldlm_cancel_lru() be fast.
*/
ldlm_lock_remove_from_lru(lock); ldlm_lock_remove_from_lru(lock);
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST; lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
LDLM_DEBUG(lock, "completion AST includes blocking AST"); LDLM_DEBUG(lock, "completion AST includes blocking AST");
...@@ -276,8 +279,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, ...@@ -276,8 +279,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work"); LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
/* Let Enqueue to call osc_lock_upcall() and initialize /* Let Enqueue to call osc_lock_upcall() and initialize l_ast_data */
* l_ast_data */
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2); OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST); ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
...@@ -371,7 +373,8 @@ static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, ...@@ -371,7 +373,8 @@ static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
wake_up(&blp->blp_waitq); wake_up(&blp->blp_waitq);
/* can not check blwi->blwi_flags as blwi could be already freed in /* can not check blwi->blwi_flags as blwi could be already freed in
LCF_ASYNC mode */ * LCF_ASYNC mode
*/
if (!(cancel_flags & LCF_ASYNC)) if (!(cancel_flags & LCF_ASYNC))
wait_for_completion(&blwi->blwi_comp); wait_for_completion(&blwi->blwi_comp);
...@@ -541,7 +544,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) ...@@ -541,7 +544,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
/* Requests arrive in sender's byte order. The ptlrpc service /* Requests arrive in sender's byte order. The ptlrpc service
* handler has already checked and, if necessary, byte-swapped the * handler has already checked and, if necessary, byte-swapped the
* incoming request message body, but I am responsible for the * incoming request message body, but I am responsible for the
* message buffers. */ * message buffers.
*/
/* do nothing for sec context finalize */ /* do nothing for sec context finalize */
if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI) if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
...@@ -603,7 +607,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) ...@@ -603,7 +607,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
} }
/* Force a known safe race, send a cancel to the server for a lock /* Force a known safe race, send a cancel to the server for a lock
* which the server has already started a blocking callback on. */ * which the server has already started a blocking callback on.
*/
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) && if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) { lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0); rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
...@@ -633,7 +638,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) ...@@ -633,7 +638,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
/* If somebody cancels lock and cache is already dropped, /* If somebody cancels lock and cache is already dropped,
* or lock is failed before cp_ast received on client, * or lock is failed before cp_ast received on client,
* we can tell the server we have no lock. Otherwise, we * we can tell the server we have no lock. Otherwise, we
* should send cancel after dropping the cache. */ * should send cancel after dropping the cache.
*/
if (((lock->l_flags & LDLM_FL_CANCELING) && if (((lock->l_flags & LDLM_FL_CANCELING) &&
(lock->l_flags & LDLM_FL_BL_DONE)) || (lock->l_flags & LDLM_FL_BL_DONE)) ||
(lock->l_flags & LDLM_FL_FAILED)) { (lock->l_flags & LDLM_FL_FAILED)) {
...@@ -647,7 +653,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) ...@@ -647,7 +653,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
return 0; return 0;
} }
/* BL_AST locks are not needed in LRU. /* BL_AST locks are not needed in LRU.
* Let ldlm_cancel_lru() be fast. */ * Let ldlm_cancel_lru() be fast.
*/
ldlm_lock_remove_from_lru(lock); ldlm_lock_remove_from_lru(lock);
lock->l_flags |= LDLM_FL_BL_AST; lock->l_flags |= LDLM_FL_BL_AST;
} }
...@@ -660,7 +667,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) ...@@ -660,7 +667,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
* But we'd also like to be able to indicate in the reply that we're * But we'd also like to be able to indicate in the reply that we're
* cancelling right now, because it's unused, or have an intent result * cancelling right now, because it's unused, or have an intent result
* in the reply, so we might have to push the responsibility for sending * in the reply, so we might have to push the responsibility for sending
* the reply down into the AST handlers, alas. */ * the reply down into the AST handlers, alas.
*/
switch (lustre_msg_get_opc(req->rq_reqmsg)) { switch (lustre_msg_get_opc(req->rq_reqmsg)) {
case LDLM_BL_CALLBACK: case LDLM_BL_CALLBACK:
...@@ -809,7 +817,8 @@ static int ldlm_bl_thread_main(void *arg) ...@@ -809,7 +817,8 @@ static int ldlm_bl_thread_main(void *arg)
/* The special case when we cancel locks in LRU /* The special case when we cancel locks in LRU
* asynchronously, we pass the list of locks here. * asynchronously, we pass the list of locks here.
* Thus locks are marked LDLM_FL_CANCELING, but NOT * Thus locks are marked LDLM_FL_CANCELING, but NOT
* canceled locally yet. */ * canceled locally yet.
*/
count = ldlm_cli_cancel_list_local(&blwi->blwi_head, count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
blwi->blwi_count, blwi->blwi_count,
LCF_BL_AST); LCF_BL_AST);
...@@ -1116,7 +1125,8 @@ void ldlm_exit(void) ...@@ -1116,7 +1125,8 @@ void ldlm_exit(void)
kmem_cache_destroy(ldlm_resource_slab); kmem_cache_destroy(ldlm_resource_slab);
/* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
* synchronize_rcu() to wait a grace period elapsed, so that * synchronize_rcu() to wait a grace period elapsed, so that
* ldlm_lock_free() get a chance to be called. */ * ldlm_lock_free() get a chance to be called.
*/
synchronize_rcu(); synchronize_rcu();
kmem_cache_destroy(ldlm_lock_slab); kmem_cache_destroy(ldlm_lock_slab);
kmem_cache_destroy(ldlm_interval_slab); kmem_cache_destroy(ldlm_interval_slab);
......
...@@ -128,7 +128,8 @@ static int ldlm_expired_completion_wait(void *data) ...@@ -128,7 +128,8 @@ static int ldlm_expired_completion_wait(void *data)
} }
/* We use the same basis for both server side and client side functions /* We use the same basis for both server side and client side functions
from a single node. */ * from a single node.
*/
static int ldlm_get_enq_timeout(struct ldlm_lock *lock) static int ldlm_get_enq_timeout(struct ldlm_lock *lock)
{ {
int timeout = at_get(ldlm_lock_to_ns_at(lock)); int timeout = at_get(ldlm_lock_to_ns_at(lock));
...@@ -136,8 +137,9 @@ static int ldlm_get_enq_timeout(struct ldlm_lock *lock) ...@@ -136,8 +137,9 @@ static int ldlm_get_enq_timeout(struct ldlm_lock *lock)
if (AT_OFF) if (AT_OFF)
return obd_timeout / 2; return obd_timeout / 2;
/* Since these are non-updating timeouts, we should be conservative. /* Since these are non-updating timeouts, we should be conservative.
It would be nice to have some kind of "early reply" mechanism for * It would be nice to have some kind of "early reply" mechanism for
lock callbacks too... */ * lock callbacks too...
*/
timeout = min_t(int, at_max, timeout + (timeout >> 1)); /* 150% */ timeout = min_t(int, at_max, timeout + (timeout >> 1)); /* 150% */
return max(timeout, ldlm_enqueue_min); return max(timeout, ldlm_enqueue_min);
} }
...@@ -243,8 +245,9 @@ int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) ...@@ -243,8 +245,9 @@ int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
imp = obd->u.cli.cl_import; imp = obd->u.cli.cl_import;
/* Wait a long time for enqueue - server may have to callback a /* Wait a long time for enqueue - server may have to callback a
lock from another client. Server will evict the other client if it * lock from another client. Server will evict the other client if it
doesn't respond reasonably, and then give us the lock. */ * doesn't respond reasonably, and then give us the lock.
*/
timeout = ldlm_get_enq_timeout(lock) * 2; timeout = ldlm_get_enq_timeout(lock) * 2;
lwd.lwd_lock = lock; lwd.lwd_lock = lock;
...@@ -296,7 +299,8 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns, ...@@ -296,7 +299,8 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
!(lock->l_flags & LDLM_FL_FAILED)) { !(lock->l_flags & LDLM_FL_FAILED)) {
/* Make sure that this lock will not be found by raced /* Make sure that this lock will not be found by raced
* bl_ast and -EINVAL reply is sent to server anyways. * bl_ast and -EINVAL reply is sent to server anyways.
* bug 17645 */ * bug 17645
*/
lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED | lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED |
LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING; LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING;
need_cancel = 1; need_cancel = 1;
...@@ -312,11 +316,13 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns, ...@@ -312,11 +316,13 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
ldlm_lock_decref_internal(lock, mode); ldlm_lock_decref_internal(lock, mode);
/* XXX - HACK because we shouldn't call ldlm_lock_destroy() /* XXX - HACK because we shouldn't call ldlm_lock_destroy()
* from llite/file.c/ll_file_flock(). */ * from llite/file.c/ll_file_flock().
*/
/* This code makes for the fact that we do not have blocking handler on /* This code makes for the fact that we do not have blocking handler on
* a client for flock locks. As such this is the place where we must * a client for flock locks. As such this is the place where we must
* completely kill failed locks. (interrupted and those that * completely kill failed locks. (interrupted and those that
* were waiting to be granted when server evicted us. */ * were waiting to be granted when server evicted us.
*/
if (lock->l_resource->lr_type == LDLM_FLOCK) { if (lock->l_resource->lr_type == LDLM_FLOCK) {
lock_res_and_lock(lock); lock_res_and_lock(lock);
ldlm_resource_unlink_lock(lock); ldlm_resource_unlink_lock(lock);
...@@ -402,7 +408,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, ...@@ -402,7 +408,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
/* Key change rehash lock in per-export hash with new key */ /* Key change rehash lock in per-export hash with new key */
if (exp->exp_lock_hash) { if (exp->exp_lock_hash) {
/* In the function below, .hs_keycmp resolves to /* In the function below, .hs_keycmp resolves to
* ldlm_export_lock_keycmp() */ * ldlm_export_lock_keycmp()
*/
/* coverity[overrun-buffer-val] */ /* coverity[overrun-buffer-val] */
cfs_hash_rehash_key(exp->exp_lock_hash, cfs_hash_rehash_key(exp->exp_lock_hash,
&lock->l_remote_handle, &lock->l_remote_handle,
...@@ -416,7 +423,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, ...@@ -416,7 +423,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags & lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
LDLM_INHERIT_FLAGS); LDLM_INHERIT_FLAGS);
/* move NO_TIMEOUT flag to the lock to force ldlm_lock_match() /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match()
* to wait with no timeout as well */ * to wait with no timeout as well
*/
lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags & lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
LDLM_FL_NO_TIMEOUT); LDLM_FL_NO_TIMEOUT);
unlock_res_and_lock(lock); unlock_res_and_lock(lock);
...@@ -426,7 +434,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, ...@@ -426,7 +434,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
/* If enqueue returned a blocked lock but the completion handler has /* If enqueue returned a blocked lock but the completion handler has
* already run, then it fixed up the resource and we don't need to do it * already run, then it fixed up the resource and we don't need to do it
* again. */ * again.
*/
if ((*flags) & LDLM_FL_LOCK_CHANGED) { if ((*flags) & LDLM_FL_LOCK_CHANGED) {
int newmode = reply->lock_desc.l_req_mode; int newmode = reply->lock_desc.l_req_mode;
...@@ -468,7 +477,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, ...@@ -468,7 +477,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
if ((*flags) & LDLM_FL_AST_SENT || if ((*flags) & LDLM_FL_AST_SENT ||
/* Cancel extent locks as soon as possible on a liblustre client, /* Cancel extent locks as soon as possible on a liblustre client,
* because it cannot handle asynchronous ASTs robustly (see * because it cannot handle asynchronous ASTs robustly (see
* bug 7311). */ * bug 7311).
*/
(LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) { (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) {
lock_res_and_lock(lock); lock_res_and_lock(lock);
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST; lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
...@@ -477,12 +487,14 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, ...@@ -477,12 +487,14 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
} }
/* If the lock has already been granted by a completion AST, don't /* If the lock has already been granted by a completion AST, don't
* clobber the LVB with an older one. */ * clobber the LVB with an older one.
*/
if (lvb_len != 0) { if (lvb_len != 0) {
/* We must lock or a racing completion might update lvb without /* We must lock or a racing completion might update lvb without
* letting us know and we'll clobber the correct value. * letting us know and we'll clobber the correct value.
* Cannot unlock after the check either, a that still leaves * Cannot unlock after the check either, as that still leaves
* a tiny window for completion to get in */ * a tiny window for completion to get in
*/
lock_res_and_lock(lock); lock_res_and_lock(lock);
if (lock->l_req_mode != lock->l_granted_mode) if (lock->l_req_mode != lock->l_granted_mode)
rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER, rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
...@@ -508,7 +520,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, ...@@ -508,7 +520,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
if (lvb_len && lvb) { if (lvb_len && lvb) {
/* Copy the LVB here, and not earlier, because the completion /* Copy the LVB here, and not earlier, because the completion
* AST (if any) can override what we got in the reply */ * AST (if any) can override what we got in the reply
*/
memcpy(lvb, lock->l_lvb_data, lvb_len); memcpy(lvb, lock->l_lvb_data, lvb_len);
} }
...@@ -594,7 +607,8 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, ...@@ -594,7 +607,8 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
/* Cancel LRU locks here _only_ if the server supports /* Cancel LRU locks here _only_ if the server supports
* EARLY_CANCEL. Otherwise we have to send extra CANCEL * EARLY_CANCEL. Otherwise we have to send extra CANCEL
* RPC, which will make us slower. */ * RPC, which will make us slower.
*/
if (avail > count) if (avail > count)
count += ldlm_cancel_lru_local(ns, cancels, to_free, count += ldlm_cancel_lru_local(ns, cancels, to_free,
avail - count, 0, flags); avail - count, 0, flags);
...@@ -619,7 +633,8 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, ...@@ -619,7 +633,8 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
/* Skip first lock handler in ldlm_request_pack(), /* Skip first lock handler in ldlm_request_pack(),
* this method will increment @lock_count according * this method will increment @lock_count according
* to the lock handle amount actually written to * to the lock handle amount actually written to
* the buffer. */ * the buffer.
*/
dlm->lock_count = canceloff; dlm->lock_count = canceloff;
} }
/* Pack into the request @pack lock handles. */ /* Pack into the request @pack lock handles. */
...@@ -669,7 +684,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, ...@@ -669,7 +684,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
ns = exp->exp_obd->obd_namespace; ns = exp->exp_obd->obd_namespace;
/* If we're replaying this lock, just check some invariants. /* If we're replaying this lock, just check some invariants.
* If we're creating a new lock, get everything all setup nice. */ * If we're creating a new lock, get everything all setup nicely.
*/
if (is_replay) { if (is_replay) {
lock = ldlm_handle2lock_long(lockh, 0); lock = ldlm_handle2lock_long(lockh, 0);
LASSERT(lock); LASSERT(lock);
...@@ -766,7 +782,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, ...@@ -766,7 +782,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
lockh, rc); lockh, rc);
/* If ldlm_cli_enqueue_fini did not find the lock, we need to free /* If ldlm_cli_enqueue_fini did not find the lock, we need to free
* one reference that we took */ * one reference that we took
*/
if (err == -ENOLCK) if (err == -ENOLCK)
LDLM_LOCK_RELEASE(lock); LDLM_LOCK_RELEASE(lock);
else else
...@@ -842,7 +859,8 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req, ...@@ -842,7 +859,8 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req,
/* XXX: it would be better to pack lock handles grouped by resource. /* XXX: it would be better to pack lock handles grouped by resource.
* so that the server cancel would call filter_lvbo_update() less * so that the server cancel would call filter_lvbo_update() less
* frequently. */ * frequently.
*/
list_for_each_entry(lock, head, l_bl_ast) { list_for_each_entry(lock, head, l_bl_ast) {
if (!count--) if (!count--)
break; break;
...@@ -857,7 +875,8 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req, ...@@ -857,7 +875,8 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req,
/** /**
* Prepare and send a batched cancel RPC. It will include \a count lock * Prepare and send a batched cancel RPC. It will include \a count lock
* handles of locks given in \a cancels list. */ * handles of locks given in \a cancels list.
*/
static int ldlm_cli_cancel_req(struct obd_export *exp, static int ldlm_cli_cancel_req(struct obd_export *exp,
struct list_head *cancels, struct list_head *cancels,
int count, enum ldlm_cancel_flags flags) int count, enum ldlm_cancel_flags flags)
...@@ -969,7 +988,8 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req) ...@@ -969,7 +988,8 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req)
* is the case when server does not support LRU resize feature. * is the case when server does not support LRU resize feature.
* This is also possible in some recovery cases when server-side * This is also possible in some recovery cases when server-side
* reqs have no reference to the OBD export and thus access to * reqs have no reference to the OBD export and thus access to
* server-side namespace is not possible. */ * server-side namespace is not possible.
*/
if (lustre_msg_get_slv(req->rq_repmsg) == 0 || if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
lustre_msg_get_limit(req->rq_repmsg) == 0) { lustre_msg_get_limit(req->rq_repmsg) == 0) {
DEBUG_REQ(D_HA, req, DEBUG_REQ(D_HA, req,
...@@ -987,7 +1007,8 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req) ...@@ -987,7 +1007,8 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req)
* to the pool thread. We do not access obd_namespace and pool * to the pool thread. We do not access obd_namespace and pool
* directly here as there is no reliable way to make sure that * directly here as there is no reliable way to make sure that
* they are still alive at cleanup time. Evil races are possible * they are still alive at cleanup time. Evil races are possible
* which may cause Oops at that time. */ * which may cause Oops at that time.
*/
write_lock(&obd->obd_pool_lock); write_lock(&obd->obd_pool_lock);
obd->obd_pool_slv = new_slv; obd->obd_pool_slv = new_slv;
obd->obd_pool_limit = new_limit; obd->obd_pool_limit = new_limit;
...@@ -1026,7 +1047,8 @@ int ldlm_cli_cancel(struct lustre_handle *lockh, ...@@ -1026,7 +1047,8 @@ int ldlm_cli_cancel(struct lustre_handle *lockh,
} }
/* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL /* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL
* RPC which goes to canceld portal, so we can cancel other LRU locks * RPC which goes to canceld portal, so we can cancel other LRU locks
* here and send them all as one LDLM_CANCEL RPC. */ * here and send them all as one LDLM_CANCEL RPC.
*/
LASSERT(list_empty(&lock->l_bl_ast)); LASSERT(list_empty(&lock->l_bl_ast));
list_add(&lock->l_bl_ast, &cancels); list_add(&lock->l_bl_ast, &cancels);
...@@ -1074,7 +1096,8 @@ int ldlm_cli_cancel_list_local(struct list_head *cancels, int count, ...@@ -1074,7 +1096,8 @@ int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
/* Until we have compound requests and can send LDLM_CANCEL /* Until we have compound requests and can send LDLM_CANCEL
* requests batched with generic RPCs, we need to send cancels * requests batched with generic RPCs, we need to send cancels
* with the LDLM_FL_BL_AST flag in a separate RPC from * with the LDLM_FL_BL_AST flag in a separate RPC from
* the one being generated now. */ * the one being generated now.
*/
if (!(flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) { if (!(flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) {
LDLM_DEBUG(lock, "Cancel lock separately"); LDLM_DEBUG(lock, "Cancel lock separately");
list_del_init(&lock->l_bl_ast); list_del_init(&lock->l_bl_ast);
...@@ -1114,7 +1137,8 @@ static ldlm_policy_res_t ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, ...@@ -1114,7 +1137,8 @@ static ldlm_policy_res_t ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns,
lock_res_and_lock(lock); lock_res_and_lock(lock);
/* don't check added & count since we want to process all locks /* don't check added & count since we want to process all locks
* from unused list */ * from unused list
*/
switch (lock->l_resource->lr_type) { switch (lock->l_resource->lr_type) {
case LDLM_EXTENT: case LDLM_EXTENT:
case LDLM_IBITS: case LDLM_IBITS:
...@@ -1150,7 +1174,8 @@ static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns, ...@@ -1150,7 +1174,8 @@ static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
unsigned long la; unsigned long la;
/* Stop LRU processing when we reach past @count or have checked all /* Stop LRU processing when we reach past @count or have checked all
* locks in LRU. */ * locks in LRU.
*/
if (count && added >= count) if (count && added >= count)
return LDLM_POLICY_KEEP_LOCK; return LDLM_POLICY_KEEP_LOCK;
...@@ -1164,7 +1189,8 @@ static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns, ...@@ -1164,7 +1189,8 @@ static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
ldlm_pool_set_clv(pl, lv); ldlm_pool_set_clv(pl, lv);
/* Stop when SLV is not yet come from server or lv is smaller than /* Stop when SLV is not yet come from server or lv is smaller than
* it is. */ * it is.
*/
return (slv == 0 || lv < slv) ? return (slv == 0 || lv < slv) ?
LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK; LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
} }
...@@ -1184,7 +1210,8 @@ static ldlm_policy_res_t ldlm_cancel_passed_policy(struct ldlm_namespace *ns, ...@@ -1184,7 +1210,8 @@ static ldlm_policy_res_t ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
int count) int count)
{ {
/* Stop LRU processing when we reach past @count or have checked all /* Stop LRU processing when we reach past @count or have checked all
* locks in LRU. */ * locks in LRU.
*/
return (added >= count) ? return (added >= count) ?
LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK; LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
} }
...@@ -1225,7 +1252,8 @@ static ldlm_policy_res_t ldlm_cancel_default_policy(struct ldlm_namespace *ns, ...@@ -1225,7 +1252,8 @@ static ldlm_policy_res_t ldlm_cancel_default_policy(struct ldlm_namespace *ns,
int count) int count)
{ {
/* Stop LRU processing when we reach past count or have checked all /* Stop LRU processing when we reach past count or have checked all
* locks in LRU. */ * locks in LRU.
*/
return (added >= count) ? return (added >= count) ?
LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK; LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
} }
...@@ -1329,7 +1357,8 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, ...@@ -1329,7 +1357,8 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
continue; continue;
/* Somebody is already doing CANCEL. No need for this /* Somebody is already doing CANCEL. No need for this
* lock in LRU, do not traverse it again. */ * lock in LRU, do not traverse it again.
*/
if (!(lock->l_flags & LDLM_FL_CANCELING)) if (!(lock->l_flags & LDLM_FL_CANCELING))
break; break;
...@@ -1378,7 +1407,8 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, ...@@ -1378,7 +1407,8 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
/* Another thread is removing lock from LRU, or /* Another thread is removing lock from LRU, or
* somebody is already doing CANCEL, or there * somebody is already doing CANCEL, or there
* is a blocking request which will send cancel * is a blocking request which will send cancel
* by itself, or the lock is no longer unused. */ * by itself, or the lock is no longer unused.
*/
unlock_res_and_lock(lock); unlock_res_and_lock(lock);
lu_ref_del(&lock->l_reference, lu_ref_del(&lock->l_reference,
__func__, current); __func__, current);
...@@ -1392,7 +1422,8 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, ...@@ -1392,7 +1422,8 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
* better send cancel notification to server, so that it * better send cancel notification to server, so that it
* frees appropriate state. This might lead to a race * frees appropriate state. This might lead to a race
* where while we are doing cancel here, server is also * where while we are doing cancel here, server is also
* silently cancelling this lock. */ * silently cancelling this lock.
*/
lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK; lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK;
/* Setting the CBPENDING flag is a little misleading, /* Setting the CBPENDING flag is a little misleading,
...@@ -1400,7 +1431,8 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, ...@@ -1400,7 +1431,8 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
* CBPENDING is set, the lock can accumulate no more * CBPENDING is set, the lock can accumulate no more
* readers/writers. Since readers and writers are * readers/writers. Since readers and writers are
* already zero here, ldlm_lock_decref() won't see * already zero here, ldlm_lock_decref() won't see
* this flag and call l_blocking_ast */ * this flag and call l_blocking_ast
*/
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING; lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
/* We can't re-add to l_lru as it confuses the /* We can't re-add to l_lru as it confuses the
...@@ -1408,7 +1440,8 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, ...@@ -1408,7 +1440,8 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
* arrives after we drop lr_lock below. We use l_bl_ast * arrives after we drop lr_lock below. We use l_bl_ast
* and can't use l_pending_chain as it is used both on * and can't use l_pending_chain as it is used both on
* server and client nevertheless bug 5666 says it is * server and client nevertheless bug 5666 says it is
* used only on server */ * used only on server
*/
LASSERT(list_empty(&lock->l_bl_ast)); LASSERT(list_empty(&lock->l_bl_ast));
list_add(&lock->l_bl_ast, cancels); list_add(&lock->l_bl_ast, cancels);
unlock_res_and_lock(lock); unlock_res_and_lock(lock);
...@@ -1449,7 +1482,8 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ...@@ -1449,7 +1482,8 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
int count, rc; int count, rc;
/* Just prepare the list of locks, do not actually cancel them yet. /* Just prepare the list of locks, do not actually cancel them yet.
* Locks are cancelled later in a separate thread. */ * Locks are cancelled later in a separate thread.
*/
count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags); count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags);
rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags); rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
if (rc == 0) if (rc == 0)
...@@ -1485,7 +1519,8 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res, ...@@ -1485,7 +1519,8 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
continue; continue;
/* If somebody is already doing CANCEL, or blocking AST came, /* If somebody is already doing CANCEL, or blocking AST came,
* skip this lock. */ * skip this lock.
*/
if (lock->l_flags & LDLM_FL_BL_AST || if (lock->l_flags & LDLM_FL_BL_AST ||
lock->l_flags & LDLM_FL_CANCELING) lock->l_flags & LDLM_FL_CANCELING)
continue; continue;
...@@ -1494,7 +1529,8 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res, ...@@ -1494,7 +1529,8 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
continue; continue;
/* If policy is given and this is IBITS lock, add to list only /* If policy is given and this is IBITS lock, add to list only
* those locks that match by policy. */ * those locks that match by policy.
*/
if (policy && (lock->l_resource->lr_type == LDLM_IBITS) && if (policy && (lock->l_resource->lr_type == LDLM_IBITS) &&
!(lock->l_policy_data.l_inodebits.bits & !(lock->l_policy_data.l_inodebits.bits &
policy->l_inodebits.bits)) policy->l_inodebits.bits))
...@@ -1539,7 +1575,8 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count, ...@@ -1539,7 +1575,8 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count,
* Usually it is enough to have just 1 RPC, but it is possible that * Usually it is enough to have just 1 RPC, but it is possible that
* there are too many locks to be cancelled in LRU or on a resource. * there are too many locks to be cancelled in LRU or on a resource.
* It would also speed up the case when the server does not support * It would also speed up the case when the server does not support
* the feature. */ * the feature.
*/
while (count > 0) { while (count > 0) {
LASSERT(!list_empty(cancels)); LASSERT(!list_empty(cancels));
lock = list_entry(cancels->next, struct ldlm_lock, lock = list_entry(cancels->next, struct ldlm_lock,
...@@ -1577,7 +1614,8 @@ EXPORT_SYMBOL(ldlm_cli_cancel_list); ...@@ -1577,7 +1614,8 @@ EXPORT_SYMBOL(ldlm_cli_cancel_list);
* Cancel all locks on a resource that have 0 readers/writers. * Cancel all locks on a resource that have 0 readers/writers.
* *
* If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
* to notify the server. */ * to notify the server.
*/
int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
const struct ldlm_res_id *res_id, const struct ldlm_res_id *res_id,
ldlm_policy_data_t *policy, ldlm_policy_data_t *policy,
...@@ -1815,7 +1853,8 @@ static int replay_lock_interpret(const struct lu_env *env, ...@@ -1815,7 +1853,8 @@ static int replay_lock_interpret(const struct lu_env *env,
exp = req->rq_export; exp = req->rq_export;
if (exp && exp->exp_lock_hash) { if (exp && exp->exp_lock_hash) {
/* In the function below, .hs_keycmp resolves to /* In the function below, .hs_keycmp resolves to
* ldlm_export_lock_keycmp() */ * ldlm_export_lock_keycmp()
*/
/* coverity[overrun-buffer-val] */ /* coverity[overrun-buffer-val] */
cfs_hash_rehash_key(exp->exp_lock_hash, cfs_hash_rehash_key(exp->exp_lock_hash,
&lock->l_remote_handle, &lock->l_remote_handle,
...@@ -1850,7 +1889,8 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) ...@@ -1850,7 +1889,8 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
/* If this is reply-less callback lock, we cannot replay it, since /* If this is reply-less callback lock, we cannot replay it, since
* server might have long dropped it, but notification of that event was * server might have long dropped it, but notification of that event was
* lost by network. (and server granted conflicting lock already) */ * lost by network. (and server granted conflicting lock already)
*/
if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) { if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) {
LDLM_DEBUG(lock, "Not replaying reply-less lock:"); LDLM_DEBUG(lock, "Not replaying reply-less lock:");
ldlm_lock_cancel(lock); ldlm_lock_cancel(lock);
...@@ -1901,7 +1941,8 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) ...@@ -1901,7 +1941,8 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
/* notify the server we've replayed all requests. /* notify the server we've replayed all requests.
* also, we mark the request to be put on a dedicated * also, we mark the request to be put on a dedicated
* queue to be processed after all request replayes. * queue to be processed after all request replayes.
* bug 6063 */ * bug 6063
*/
lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE); lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE);
LDLM_DEBUG(lock, "replaying lock:"); LDLM_DEBUG(lock, "replaying lock:");
...@@ -1936,7 +1977,8 @@ static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns) ...@@ -1936,7 +1977,8 @@ static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
/* We don't need to care whether or not LRU resize is enabled /* We don't need to care whether or not LRU resize is enabled
* because the LDLM_CANCEL_NO_WAIT policy doesn't use the * because the LDLM_CANCEL_NO_WAIT policy doesn't use the
* count parameter */ * count parameter
*/
canceled = ldlm_cancel_lru_local(ns, &cancels, ns->ns_nr_unused, 0, canceled = ldlm_cancel_lru_local(ns, &cancels, ns->ns_nr_unused, 0,
LCF_LOCAL, LDLM_CANCEL_NO_WAIT); LCF_LOCAL, LDLM_CANCEL_NO_WAIT);
......
...@@ -56,7 +56,8 @@ LIST_HEAD(ldlm_srv_namespace_list); ...@@ -56,7 +56,8 @@ LIST_HEAD(ldlm_srv_namespace_list);
struct mutex ldlm_cli_namespace_lock; struct mutex ldlm_cli_namespace_lock;
/* Client Namespaces that have active resources in them. /* Client Namespaces that have active resources in them.
* Once all resources go away, ldlm_poold moves such namespaces to the * Once all resources go away, ldlm_poold moves such namespaces to the
* inactive list */ * inactive list
*/
LIST_HEAD(ldlm_cli_active_namespace_list); LIST_HEAD(ldlm_cli_active_namespace_list);
/* Client namespaces that don't have any locks in them */ /* Client namespaces that don't have any locks in them */
static LIST_HEAD(ldlm_cli_inactive_namespace_list); static LIST_HEAD(ldlm_cli_inactive_namespace_list);
...@@ -66,7 +67,8 @@ static struct dentry *ldlm_ns_debugfs_dir; ...@@ -66,7 +67,8 @@ static struct dentry *ldlm_ns_debugfs_dir;
struct dentry *ldlm_svc_debugfs_dir; struct dentry *ldlm_svc_debugfs_dir;
/* during debug dump certain amount of granted locks for one resource to avoid /* during debug dump certain amount of granted locks for one resource to avoid
* DDOS. */ * DDOS.
*/
static unsigned int ldlm_dump_granted_max = 256; static unsigned int ldlm_dump_granted_max = 256;
static ssize_t static ssize_t
...@@ -275,7 +277,8 @@ static ssize_t lru_size_store(struct kobject *kobj, struct attribute *attr, ...@@ -275,7 +277,8 @@ static ssize_t lru_size_store(struct kobject *kobj, struct attribute *attr,
ldlm_cancel_lru(ns, 0, LCF_ASYNC, LDLM_CANCEL_PASSED); ldlm_cancel_lru(ns, 0, LCF_ASYNC, LDLM_CANCEL_PASSED);
/* Make sure that LRU resize was originally supported before /* Make sure that LRU resize was originally supported before
* turning it on here. */ * turning it on here.
*/
if (lru_resize && if (lru_resize &&
(ns->ns_orig_connect_flags & OBD_CONNECT_LRU_RESIZE)) { (ns->ns_orig_connect_flags & OBD_CONNECT_LRU_RESIZE)) {
CDEBUG(D_DLMTRACE, CDEBUG(D_DLMTRACE,
...@@ -749,7 +752,8 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, ...@@ -749,7 +752,8 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
struct lustre_handle lockh; struct lustre_handle lockh;
/* First, we look for non-cleaned-yet lock /* First, we look for non-cleaned-yet lock
* all cleaned locks are marked by CLEANED flag. */ * all cleaned locks are marked by CLEANED flag.
*/
lock_res(res); lock_res(res);
list_for_each(tmp, q) { list_for_each(tmp, q) {
lock = list_entry(tmp, struct ldlm_lock, lock = list_entry(tmp, struct ldlm_lock,
...@@ -769,7 +773,8 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, ...@@ -769,7 +773,8 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
} }
/* Set CBPENDING so nothing in the cancellation path /* Set CBPENDING so nothing in the cancellation path
* can match this lock. */ * can match this lock.
*/
lock->l_flags |= LDLM_FL_CBPENDING; lock->l_flags |= LDLM_FL_CBPENDING;
lock->l_flags |= LDLM_FL_FAILED; lock->l_flags |= LDLM_FL_FAILED;
lock->l_flags |= flags; lock->l_flags |= flags;
...@@ -782,7 +787,8 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, ...@@ -782,7 +787,8 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
/* This is a little bit gross, but much better than the /* This is a little bit gross, but much better than the
* alternative: pretend that we got a blocking AST from * alternative: pretend that we got a blocking AST from
* the server, so that when the lock is decref'd, it * the server, so that when the lock is decref'd, it
* will go away ... */ * will go away ...
*/
unlock_res(res); unlock_res(res);
LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY"); LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
if (lock->l_completion_ast) if (lock->l_completion_ast)
...@@ -873,7 +879,8 @@ static int __ldlm_namespace_free(struct ldlm_namespace *ns, int force) ...@@ -873,7 +879,8 @@ static int __ldlm_namespace_free(struct ldlm_namespace *ns, int force)
atomic_read(&ns->ns_bref) == 0, &lwi); atomic_read(&ns->ns_bref) == 0, &lwi);
/* Forced cleanups should be able to reclaim all references, /* Forced cleanups should be able to reclaim all references,
* so it's safe to wait forever... we can't leak locks... */ * so it's safe to wait forever... we can't leak locks...
*/
if (force && rc == -ETIMEDOUT) { if (force && rc == -ETIMEDOUT) {
LCONSOLE_ERROR("Forced cleanup waiting for %s namespace with %d resources in use, (rc=%d)\n", LCONSOLE_ERROR("Forced cleanup waiting for %s namespace with %d resources in use, (rc=%d)\n",
ldlm_ns_name(ns), ldlm_ns_name(ns),
...@@ -943,7 +950,8 @@ static void ldlm_namespace_unregister(struct ldlm_namespace *ns, ...@@ -943,7 +950,8 @@ static void ldlm_namespace_unregister(struct ldlm_namespace *ns,
LASSERT(!list_empty(&ns->ns_list_chain)); LASSERT(!list_empty(&ns->ns_list_chain));
/* Some asserts and possibly other parts of the code are still /* Some asserts and possibly other parts of the code are still
* using list_empty(&ns->ns_list_chain). This is why it is * using list_empty(&ns->ns_list_chain). This is why it is
* important to use list_del_init() here. */ * important to use list_del_init() here.
*/
list_del_init(&ns->ns_list_chain); list_del_init(&ns->ns_list_chain);
ldlm_namespace_nr_dec(client); ldlm_namespace_nr_dec(client);
mutex_unlock(ldlm_namespace_lock(client)); mutex_unlock(ldlm_namespace_lock(client));
...@@ -963,7 +971,8 @@ void ldlm_namespace_free_post(struct ldlm_namespace *ns) ...@@ -963,7 +971,8 @@ void ldlm_namespace_free_post(struct ldlm_namespace *ns)
ldlm_namespace_unregister(ns, ns->ns_client); ldlm_namespace_unregister(ns, ns->ns_client);
/* Fini pool _before_ parent proc dir is removed. This is important as /* Fini pool _before_ parent proc dir is removed. This is important as
* ldlm_pool_fini() removes own proc dir which is child to @dir. * ldlm_pool_fini() removes own proc dir which is child to @dir.
* Removing it after @dir may cause oops. */ * Removing it after @dir may cause oops.
*/
ldlm_pool_fini(&ns->ns_pool); ldlm_pool_fini(&ns->ns_pool);
ldlm_namespace_debugfs_unregister(ns); ldlm_namespace_debugfs_unregister(ns);
...@@ -971,7 +980,8 @@ void ldlm_namespace_free_post(struct ldlm_namespace *ns) ...@@ -971,7 +980,8 @@ void ldlm_namespace_free_post(struct ldlm_namespace *ns)
cfs_hash_putref(ns->ns_rs_hash); cfs_hash_putref(ns->ns_rs_hash);
/* Namespace \a ns should be not on list at this time, otherwise /* Namespace \a ns should be not on list at this time, otherwise
* this will cause issues related to using freed \a ns in poold * this will cause issues related to using freed \a ns in poold
* thread. */ * thread.
*/
LASSERT(list_empty(&ns->ns_list_chain)); LASSERT(list_empty(&ns->ns_list_chain));
kfree(ns); kfree(ns);
ldlm_put_ref(); ldlm_put_ref();
...@@ -1050,7 +1060,8 @@ static struct ldlm_resource *ldlm_resource_new(void) ...@@ -1050,7 +1060,8 @@ static struct ldlm_resource *ldlm_resource_new(void)
lu_ref_init(&res->lr_reference); lu_ref_init(&res->lr_reference);
/* The creator of the resource must unlock the mutex after LVB /* The creator of the resource must unlock the mutex after LVB
* initialization. */ * initialization.
*/
mutex_init(&res->lr_lvb_mutex); mutex_init(&res->lr_lvb_mutex);
mutex_lock(&res->lr_lvb_mutex); mutex_lock(&res->lr_lvb_mutex);
...@@ -1166,7 +1177,8 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, ...@@ -1166,7 +1177,8 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
/* Let's see if we happened to be the very first resource in this /* Let's see if we happened to be the very first resource in this
* namespace. If so, and this is a client namespace, we need to move * namespace. If so, and this is a client namespace, we need to move
* the namespace into the active namespaces list to be patrolled by * the namespace into the active namespaces list to be patrolled by
* the ldlm_poold. */ * the ldlm_poold.
*/
if (ns_refcount == 1) { if (ns_refcount == 1) {
mutex_lock(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT)); mutex_lock(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
ldlm_namespace_move_to_active_locked(ns, LDLM_NAMESPACE_CLIENT); ldlm_namespace_move_to_active_locked(ns, LDLM_NAMESPACE_CLIENT);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment