Commit 986ae3c2 authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland

dlm: fix race between final callback and remove

This patch fixes the following issue:

node 1 is dir
node 2 is master
node 3 is other

1->2: unlock
2: put final lkb, rsb moved to toss
2->1: unlock_reply
1: queue lkb callback with EUNLOCK
2->1: remove
1: receive_remove ignored (rsb on keep because of queued lkb callback)
1: complete lkb callback, put_lkb, move rsb to toss
3->1: lookup
1->3: lookup_reply master=2
3->2: request
2->3: request_reply EBADR

In summary:
An unexpected lkb reference causes the rsb to remain on the wrong list.
The rsb being on the wrong list causes receive_remove to be ignored.
An ignored receive_remove causes inconsistent dir and master state.

This sequence requires an unusually long delay in delivering the unlock
callback, because the remove message from 2->1 usually happens after
some seconds.  So, it's not known exactly how frequently this sequence
occurs in pratice.  It's possible that the same end result could also
have another unknown cause.

The solution for this issue is to further separate callback state
from the lkb, so that an lkb reference (and from that, an rsb ref)
are not held while a callback remains queued.  Then, within the
unlock_reply, the lkb will be freed and the rsb moved to the toss
list. So, the receive_remove will not be ignored.
Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent 0175e51b
...@@ -37,12 +37,32 @@ void dlm_callback_set_last_ptr(struct dlm_callback **from, ...@@ -37,12 +37,32 @@ void dlm_callback_set_last_ptr(struct dlm_callback **from,
*from = to; *from = to;
} }
int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, static void dlm_callback_work(struct work_struct *work)
int status, uint32_t sbflags)
{ {
struct dlm_ls *ls = lkb->lkb_resource->res_ls; struct dlm_callback *cb = container_of(work, struct dlm_callback, work);
if (cb->flags & DLM_CB_BAST) {
trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name,
cb->res_length);
cb->bastfn(cb->astparam, cb->mode);
} else if (cb->flags & DLM_CB_CAST) {
trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status,
cb->sb_flags, cb->res_name, cb->res_length);
cb->lkb_lksb->sb_status = cb->sb_status;
cb->lkb_lksb->sb_flags = cb->sb_flags;
cb->astfn(cb->astparam);
}
kref_put(&cb->ref, dlm_release_callback);
}
int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags,
struct dlm_callback **cb)
{
struct dlm_rsb *rsb = lkb->lkb_resource;
int rv = DLM_ENQUEUE_CALLBACK_SUCCESS; int rv = DLM_ENQUEUE_CALLBACK_SUCCESS;
struct dlm_callback *cb; struct dlm_ls *ls = rsb->res_ls;
int copy_lvb = 0; int copy_lvb = 0;
int prev_mode; int prev_mode;
...@@ -88,57 +108,46 @@ int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, ...@@ -88,57 +108,46 @@ int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
} }
} }
cb = dlm_allocate_cb(); *cb = dlm_allocate_cb();
if (!cb) { if (!*cb) {
rv = DLM_ENQUEUE_CALLBACK_FAILURE; rv = DLM_ENQUEUE_CALLBACK_FAILURE;
goto out; goto out;
} }
cb->flags = flags; /* for tracing */
cb->mode = mode; (*cb)->lkb_id = lkb->lkb_id;
cb->sb_status = status; (*cb)->ls_id = ls->ls_global_id;
cb->sb_flags = (sbflags & 0x000000FF); memcpy((*cb)->res_name, rsb->res_name, rsb->res_length);
cb->copy_lvb = copy_lvb; (*cb)->res_length = rsb->res_length;
kref_init(&cb->ref);
if (!test_and_set_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags))
rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
list_add_tail(&cb->list, &lkb->lkb_callbacks); (*cb)->flags = flags;
(*cb)->mode = mode;
(*cb)->sb_status = status;
(*cb)->sb_flags = (sbflags & 0x000000FF);
(*cb)->copy_lvb = copy_lvb;
(*cb)->lkb_lksb = lkb->lkb_lksb;
kref_init(&(*cb)->ref);
if (flags & DLM_CB_BAST) { if (flags & DLM_CB_BAST) {
lkb->lkb_last_bast_time = ktime_get(); lkb->lkb_last_bast_time = ktime_get();
lkb->lkb_last_bast_mode = cb->mode; lkb->lkb_last_bast_mode = mode;
} else if (flags & DLM_CB_CAST) { } else if (flags & DLM_CB_CAST) {
dlm_callback_set_last_ptr(&lkb->lkb_last_cast, cb); dlm_callback_set_last_ptr(&lkb->lkb_last_cast, *cb);
lkb->lkb_last_cast_time = ktime_get(); lkb->lkb_last_cast_time = ktime_get();
} }
dlm_callback_set_last_ptr(&lkb->lkb_last_cb, cb); dlm_callback_set_last_ptr(&lkb->lkb_last_cb, *cb);
rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
out: out:
return rv; return rv;
} }
int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb)
{
/* oldest undelivered cb is callbacks first entry */
*cb = list_first_entry_or_null(&lkb->lkb_callbacks,
struct dlm_callback, list);
if (!*cb)
return DLM_DEQUEUE_CALLBACK_EMPTY;
/* remove it from callbacks so shift others down */
list_del(&(*cb)->list);
if (list_empty(&lkb->lkb_callbacks))
return DLM_DEQUEUE_CALLBACK_LAST;
return DLM_DEQUEUE_CALLBACK_SUCCESS;
}
void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
uint32_t sbflags) uint32_t sbflags)
{ {
struct dlm_ls *ls = lkb->lkb_resource->res_ls; struct dlm_ls *ls = lkb->lkb_resource->res_ls;
struct dlm_callback *cb;
int rv; int rv;
if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) { if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
...@@ -146,18 +155,20 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, ...@@ -146,18 +155,20 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
return; return;
} }
spin_lock(&lkb->lkb_cb_lock); rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags,
rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags); &cb);
switch (rv) { switch (rv) {
case DLM_ENQUEUE_CALLBACK_NEED_SCHED: case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
kref_get(&lkb->lkb_ref); cb->astfn = lkb->lkb_astfn;
cb->bastfn = lkb->lkb_bastfn;
cb->astparam = lkb->lkb_astparam;
INIT_WORK(&cb->work, dlm_callback_work);
spin_lock(&ls->ls_cb_lock); spin_lock(&ls->ls_cb_lock);
if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) { if (test_bit(LSFL_CB_DELAY, &ls->ls_flags))
list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay); list_add(&cb->list, &ls->ls_cb_delay);
} else { else
queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work); queue_work(ls->ls_callback_wq, &cb->work);
}
spin_unlock(&ls->ls_cb_lock); spin_unlock(&ls->ls_cb_lock);
break; break;
case DLM_ENQUEUE_CALLBACK_SUCCESS: case DLM_ENQUEUE_CALLBACK_SUCCESS:
...@@ -168,67 +179,12 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, ...@@ -168,67 +179,12 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
WARN_ON_ONCE(1); WARN_ON_ONCE(1);
break; break;
} }
spin_unlock(&lkb->lkb_cb_lock);
}
void dlm_callback_work(struct work_struct *work)
{
struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work);
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
struct dlm_rsb *rsb = lkb->lkb_resource;
void (*castfn) (void *astparam);
void (*bastfn) (void *astparam, int mode);
struct dlm_callback *cb;
int rv;
spin_lock(&lkb->lkb_cb_lock);
rv = dlm_dequeue_lkb_callback(lkb, &cb);
if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY)) {
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
spin_unlock(&lkb->lkb_cb_lock);
goto out;
}
spin_unlock(&lkb->lkb_cb_lock);
for (;;) {
castfn = lkb->lkb_astfn;
bastfn = lkb->lkb_bastfn;
if (cb->flags & DLM_CB_BAST) {
trace_dlm_bast(ls->ls_global_id, lkb->lkb_id,
cb->mode, rsb->res_name,
rsb->res_length);
bastfn(lkb->lkb_astparam, cb->mode);
} else if (cb->flags & DLM_CB_CAST) {
lkb->lkb_lksb->sb_status = cb->sb_status;
lkb->lkb_lksb->sb_flags = cb->sb_flags;
trace_dlm_ast(ls->ls_global_id, lkb->lkb_id,
cb->sb_flags, cb->sb_status,
rsb->res_name, rsb->res_length);
castfn(lkb->lkb_astparam);
}
kref_put(&cb->ref, dlm_release_callback);
spin_lock(&lkb->lkb_cb_lock);
rv = dlm_dequeue_lkb_callback(lkb, &cb);
if (rv == DLM_DEQUEUE_CALLBACK_EMPTY) {
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
spin_unlock(&lkb->lkb_cb_lock);
break;
}
spin_unlock(&lkb->lkb_cb_lock);
}
out:
/* undo kref_get from dlm_add_callback, may cause lkb to be freed */
dlm_put_lkb(lkb);
} }
int dlm_callback_start(struct dlm_ls *ls) int dlm_callback_start(struct dlm_ls *ls)
{ {
ls->ls_callback_wq = alloc_workqueue("dlm_callback", ls->ls_callback_wq = alloc_ordered_workqueue("dlm_callback",
WQ_HIGHPRI | WQ_MEM_RECLAIM, 0); WQ_HIGHPRI | WQ_MEM_RECLAIM);
if (!ls->ls_callback_wq) { if (!ls->ls_callback_wq) {
log_print("can't start dlm_callback workqueue"); log_print("can't start dlm_callback workqueue");
return -ENOMEM; return -ENOMEM;
...@@ -257,7 +213,7 @@ void dlm_callback_suspend(struct dlm_ls *ls) ...@@ -257,7 +213,7 @@ void dlm_callback_suspend(struct dlm_ls *ls)
void dlm_callback_resume(struct dlm_ls *ls) void dlm_callback_resume(struct dlm_ls *ls)
{ {
struct dlm_lkb *lkb, *safe; struct dlm_callback *cb, *safe;
int count = 0, sum = 0; int count = 0, sum = 0;
bool empty; bool empty;
...@@ -266,9 +222,9 @@ void dlm_callback_resume(struct dlm_ls *ls) ...@@ -266,9 +222,9 @@ void dlm_callback_resume(struct dlm_ls *ls)
more: more:
spin_lock(&ls->ls_cb_lock); spin_lock(&ls->ls_cb_lock);
list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) { list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) {
list_del_init(&lkb->lkb_cb_list); list_del(&cb->list);
queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work); queue_work(ls->ls_callback_wq, &cb->work);
count++; count++;
if (count == MAX_CB_QUEUE) if (count == MAX_CB_QUEUE)
break; break;
......
...@@ -14,19 +14,15 @@ ...@@ -14,19 +14,15 @@
#define DLM_ENQUEUE_CALLBACK_NEED_SCHED 1 #define DLM_ENQUEUE_CALLBACK_NEED_SCHED 1
#define DLM_ENQUEUE_CALLBACK_SUCCESS 0 #define DLM_ENQUEUE_CALLBACK_SUCCESS 0
#define DLM_ENQUEUE_CALLBACK_FAILURE -1 #define DLM_ENQUEUE_CALLBACK_FAILURE -1
int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags); int status, uint32_t sbflags,
#define DLM_DEQUEUE_CALLBACK_EMPTY 2 struct dlm_callback **cb);
#define DLM_DEQUEUE_CALLBACK_LAST 1
#define DLM_DEQUEUE_CALLBACK_SUCCESS 0
int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb);
void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
uint32_t sbflags); uint32_t sbflags);
void dlm_callback_set_last_ptr(struct dlm_callback **from, void dlm_callback_set_last_ptr(struct dlm_callback **from,
struct dlm_callback *to); struct dlm_callback *to);
void dlm_release_callback(struct kref *ref); void dlm_release_callback(struct kref *ref);
void dlm_callback_work(struct work_struct *work);
int dlm_callback_start(struct dlm_ls *ls); int dlm_callback_start(struct dlm_ls *ls);
void dlm_callback_stop(struct dlm_ls *ls); void dlm_callback_stop(struct dlm_ls *ls);
void dlm_callback_suspend(struct dlm_ls *ls); void dlm_callback_suspend(struct dlm_ls *ls);
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
* This is the main header file to be included in each DLM source file. * This is the main header file to be included in each DLM source file.
*/ */
#include <uapi/linux/dlm_device.h>
#include <linux/slab.h> #include <linux/slab.h>
#include <linux/sched.h> #include <linux/sched.h>
#include <linux/types.h> #include <linux/types.h>
...@@ -204,8 +205,7 @@ struct dlm_args { ...@@ -204,8 +205,7 @@ struct dlm_args {
#define DLM_IFL_OVERLAP_CANCEL_BIT 20 #define DLM_IFL_OVERLAP_CANCEL_BIT 20
#define DLM_IFL_ENDOFLIFE_BIT 21 #define DLM_IFL_ENDOFLIFE_BIT 21
#define DLM_IFL_DEADLOCK_CANCEL_BIT 24 #define DLM_IFL_DEADLOCK_CANCEL_BIT 24
#define DLM_IFL_CB_PENDING_BIT 25 #define __DLM_IFL_MAX_BIT DLM_IFL_DEADLOCK_CANCEL_BIT
#define __DLM_IFL_MAX_BIT DLM_IFL_CB_PENDING_BIT
/* lkb_dflags */ /* lkb_dflags */
...@@ -217,12 +217,45 @@ struct dlm_args { ...@@ -217,12 +217,45 @@ struct dlm_args {
#define DLM_CB_CAST 0x00000001 #define DLM_CB_CAST 0x00000001
#define DLM_CB_BAST 0x00000002 #define DLM_CB_BAST 0x00000002
/* much of this is just saving user space pointers associated with the
* lock that we pass back to the user lib with an ast
*/
struct dlm_user_args {
struct dlm_user_proc *proc; /* each process that opens the lockspace
* device has private data
* (dlm_user_proc) on the struct file,
* the process's locks point back to it
*/
struct dlm_lksb lksb;
struct dlm_lksb __user *user_lksb;
void __user *castparam;
void __user *castaddr;
void __user *bastparam;
void __user *bastaddr;
uint64_t xid;
};
struct dlm_callback { struct dlm_callback {
uint32_t flags; /* DLM_CBF_ */ uint32_t flags; /* DLM_CBF_ */
int sb_status; /* copy to lksb status */ int sb_status; /* copy to lksb status */
uint8_t sb_flags; /* copy to lksb flags */ uint8_t sb_flags; /* copy to lksb flags */
int8_t mode; /* rq mode of bast, gr mode of cast */ int8_t mode; /* rq mode of bast, gr mode of cast */
int copy_lvb; bool copy_lvb;
struct dlm_lksb *lkb_lksb;
unsigned char lvbptr[DLM_USER_LVB_LEN];
union {
void *astparam; /* caller's ast arg */
struct dlm_user_args ua;
};
struct work_struct work;
void (*bastfn)(void *astparam, int mode);
void (*astfn)(void *astparam);
char res_name[DLM_RESNAME_MAXLEN];
size_t res_length;
uint32_t ls_id;
uint32_t lkb_id;
struct list_head list; struct list_head list;
struct kref ref; struct kref ref;
...@@ -256,10 +289,6 @@ struct dlm_lkb { ...@@ -256,10 +289,6 @@ struct dlm_lkb {
struct list_head lkb_ownqueue; /* list of locks for a process */ struct list_head lkb_ownqueue; /* list of locks for a process */
ktime_t lkb_timestamp; ktime_t lkb_timestamp;
spinlock_t lkb_cb_lock;
struct work_struct lkb_cb_work;
struct list_head lkb_cb_list; /* for ls_cb_delay or proc->asts */
struct list_head lkb_callbacks;
struct dlm_callback *lkb_last_cast; struct dlm_callback *lkb_last_cast;
struct dlm_callback *lkb_last_cb; struct dlm_callback *lkb_last_cb;
int lkb_last_bast_mode; int lkb_last_bast_mode;
...@@ -688,23 +717,6 @@ struct dlm_ls { ...@@ -688,23 +717,6 @@ struct dlm_ls {
#define LSFL_CB_DELAY 9 #define LSFL_CB_DELAY 9
#define LSFL_NODIR 10 #define LSFL_NODIR 10
/* much of this is just saving user space pointers associated with the
lock that we pass back to the user lib with an ast */
struct dlm_user_args {
struct dlm_user_proc *proc; /* each process that opens the lockspace
device has private data
(dlm_user_proc) on the struct file,
the process's locks point back to it*/
struct dlm_lksb lksb;
struct dlm_lksb __user *user_lksb;
void __user *castparam;
void __user *castaddr;
void __user *bastparam;
void __user *bastaddr;
uint64_t xid;
};
#define DLM_PROC_FLAGS_CLOSING 1 #define DLM_PROC_FLAGS_CLOSING 1
#define DLM_PROC_FLAGS_COMPAT 2 #define DLM_PROC_FLAGS_COMPAT 2
......
...@@ -1203,10 +1203,6 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, ...@@ -1203,10 +1203,6 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
kref_init(&lkb->lkb_ref); kref_init(&lkb->lkb_ref);
INIT_LIST_HEAD(&lkb->lkb_ownqueue); INIT_LIST_HEAD(&lkb->lkb_ownqueue);
INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
INIT_LIST_HEAD(&lkb->lkb_cb_list);
INIT_LIST_HEAD(&lkb->lkb_callbacks);
spin_lock_init(&lkb->lkb_cb_lock);
INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
idr_preload(GFP_NOFS); idr_preload(GFP_NOFS);
spin_lock(&ls->ls_lkbidr_spin); spin_lock(&ls->ls_lkbidr_spin);
...@@ -6003,6 +5999,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, ...@@ -6003,6 +5999,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
{ {
struct dlm_callback *cb, *cb_safe;
struct dlm_lkb *lkb, *safe; struct dlm_lkb *lkb, *safe;
dlm_lock_recovery(ls); dlm_lock_recovery(ls);
...@@ -6032,10 +6029,9 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) ...@@ -6032,10 +6029,9 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
dlm_put_lkb(lkb); dlm_put_lkb(lkb);
} }
list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
dlm_purge_lkb_callbacks(lkb); list_del(&cb->list);
list_del_init(&lkb->lkb_cb_list); kref_put(&cb->ref, dlm_release_callback);
dlm_put_lkb(lkb);
} }
spin_unlock(&ls->ls_clear_proc_locks); spin_unlock(&ls->ls_clear_proc_locks);
...@@ -6044,6 +6040,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) ...@@ -6044,6 +6040,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
{ {
struct dlm_callback *cb, *cb_safe;
struct dlm_lkb *lkb, *safe; struct dlm_lkb *lkb, *safe;
while (1) { while (1) {
...@@ -6073,10 +6070,9 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) ...@@ -6073,10 +6070,9 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
spin_unlock(&proc->locks_spin); spin_unlock(&proc->locks_spin);
spin_lock(&proc->asts_spin); spin_lock(&proc->asts_spin);
list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
dlm_purge_lkb_callbacks(lkb); list_del(&cb->list);
list_del_init(&lkb->lkb_cb_list); kref_put(&cb->ref, dlm_release_callback);
dlm_put_lkb(lkb);
} }
spin_unlock(&proc->asts_spin); spin_unlock(&proc->asts_spin);
} }
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "dlm_internal.h" #include "dlm_internal.h"
#include "lockspace.h" #include "lockspace.h"
#include "lock.h" #include "lock.h"
#include "lvb_table.h"
#include "user.h" #include "user.h"
#include "ast.h" #include "ast.h"
#include "config.h" #include "config.h"
...@@ -144,24 +145,6 @@ static void compat_output(struct dlm_lock_result *res, ...@@ -144,24 +145,6 @@ static void compat_output(struct dlm_lock_result *res,
} }
#endif #endif
/* should held proc->asts_spin lock */
void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb)
{
struct dlm_callback *cb, *safe;
list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) {
list_del(&cb->list);
kref_put(&cb->ref, dlm_release_callback);
}
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
/* invalidate */
dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL);
lkb->lkb_last_bast_mode = -1;
}
/* Figure out if this lock is at the end of its life and no longer /* Figure out if this lock is at the end of its life and no longer
available for the application to use. The lkb still exists until available for the application to use. The lkb still exists until
the final ast is read. A lock becomes EOL in three situations: the final ast is read. A lock becomes EOL in three situations:
...@@ -198,6 +181,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, ...@@ -198,6 +181,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
struct dlm_ls *ls; struct dlm_ls *ls;
struct dlm_user_args *ua; struct dlm_user_args *ua;
struct dlm_user_proc *proc; struct dlm_user_proc *proc;
struct dlm_callback *cb;
int rv; int rv;
if (test_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags) || if (test_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags) ||
...@@ -229,11 +213,18 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, ...@@ -229,11 +213,18 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
spin_lock(&proc->asts_spin); spin_lock(&proc->asts_spin);
rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags); rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags, &cb);
switch (rv) { switch (rv) {
case DLM_ENQUEUE_CALLBACK_NEED_SCHED: case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
kref_get(&lkb->lkb_ref); cb->ua = *ua;
list_add_tail(&lkb->lkb_cb_list, &proc->asts); cb->lkb_lksb = &cb->ua.lksb;
if (cb->copy_lvb) {
memcpy(cb->lvbptr, ua->lksb.sb_lvbptr,
DLM_USER_LVB_LEN);
cb->lkb_lksb->sb_lvbptr = cb->lvbptr;
}
list_add_tail(&cb->list, &proc->asts);
wake_up_interruptible(&proc->wait); wake_up_interruptible(&proc->wait);
break; break;
case DLM_ENQUEUE_CALLBACK_SUCCESS: case DLM_ENQUEUE_CALLBACK_SUCCESS:
...@@ -801,10 +792,8 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count, ...@@ -801,10 +792,8 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
loff_t *ppos) loff_t *ppos)
{ {
struct dlm_user_proc *proc = file->private_data; struct dlm_user_proc *proc = file->private_data;
struct dlm_lkb *lkb;
DECLARE_WAITQUEUE(wait, current); DECLARE_WAITQUEUE(wait, current);
struct dlm_callback *cb; struct dlm_callback *cb;
struct dlm_rsb *rsb;
int rv, ret; int rv, ret;
if (count == sizeof(struct dlm_device_version)) { if (count == sizeof(struct dlm_device_version)) {
...@@ -824,8 +813,6 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count, ...@@ -824,8 +813,6 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
#endif #endif
return -EINVAL; return -EINVAL;
try_another:
/* do we really need this? can a read happen after a close? */ /* do we really need this? can a read happen after a close? */
if (test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags)) if (test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags))
return -EINVAL; return -EINVAL;
...@@ -860,55 +847,24 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count, ...@@ -860,55 +847,24 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
without removing lkb_cb_list; so empty lkb_cb_list is always without removing lkb_cb_list; so empty lkb_cb_list is always
consistent with empty lkb_callbacks */ consistent with empty lkb_callbacks */
lkb = list_first_entry(&proc->asts, struct dlm_lkb, lkb_cb_list); cb = list_first_entry(&proc->asts, struct dlm_callback, list);
list_del(&cb->list);
rv = dlm_dequeue_lkb_callback(lkb, &cb);
switch (rv) {
case DLM_DEQUEUE_CALLBACK_EMPTY:
/* this shouldn't happen; lkb should have been removed from
* list when last item was dequeued
*/
log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id);
list_del_init(&lkb->lkb_cb_list);
spin_unlock(&proc->asts_spin);
/* removes ref for proc->asts, may cause lkb to be freed */
dlm_put_lkb(lkb);
WARN_ON_ONCE(1);
goto try_another;
case DLM_DEQUEUE_CALLBACK_LAST:
list_del_init(&lkb->lkb_cb_list);
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
break;
case DLM_DEQUEUE_CALLBACK_SUCCESS:
break;
default:
WARN_ON_ONCE(1);
break;
}
spin_unlock(&proc->asts_spin); spin_unlock(&proc->asts_spin);
rsb = lkb->lkb_resource;
if (cb->flags & DLM_CB_BAST) { if (cb->flags & DLM_CB_BAST) {
trace_dlm_bast(rsb->res_ls->ls_global_id, lkb->lkb_id, trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name,
cb->mode, rsb->res_name, rsb->res_length); cb->res_length);
} else if (cb->flags & DLM_CB_CAST) { } else if (cb->flags & DLM_CB_CAST) {
lkb->lkb_lksb->sb_status = cb->sb_status; cb->lkb_lksb->sb_status = cb->sb_status;
lkb->lkb_lksb->sb_flags = cb->sb_flags; cb->lkb_lksb->sb_flags = cb->sb_flags;
trace_dlm_ast(rsb->res_ls->ls_global_id, lkb->lkb_id, trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status,
cb->sb_flags, cb->sb_status, rsb->res_name, cb->sb_flags, cb->res_name, cb->res_length);
rsb->res_length);
} }
ret = copy_result_to_user(lkb->lkb_ua, ret = copy_result_to_user(&cb->ua,
test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags), test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags),
cb->flags, cb->mode, cb->copy_lvb, buf, count); cb->flags, cb->mode, cb->copy_lvb, buf, count);
kref_put(&cb->ref, dlm_release_callback); kref_put(&cb->ref, dlm_release_callback);
/* removes ref for proc->asts, may cause lkb to be freed */
if (rv == DLM_DEQUEUE_CALLBACK_LAST)
dlm_put_lkb(lkb);
return ret; return ret;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment