Commit a2bf0477 authored by Kurt Hackel's avatar Kurt Hackel Committed by Mark Fasheh

ocfs2: mle ref counting fixes

Signed-off-by: default avatarKurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: default avatarMark Fasheh <mark.fasheh@oracle.com>
parent 95883719
...@@ -74,6 +74,7 @@ struct dlm_master_list_entry ...@@ -74,6 +74,7 @@ struct dlm_master_list_entry
wait_queue_head_t wq; wait_queue_head_t wq;
atomic_t woken; atomic_t woken;
struct kref mle_refs; struct kref mle_refs;
int inuse;
unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
...@@ -337,6 +338,31 @@ static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm, ...@@ -337,6 +338,31 @@ static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
spin_unlock(&dlm->spinlock); spin_unlock(&dlm->spinlock);
} }
static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
{
struct dlm_ctxt *dlm;
dlm = mle->dlm;
assert_spin_locked(&dlm->spinlock);
assert_spin_locked(&dlm->master_lock);
mle->inuse++;
kref_get(&mle->mle_refs);
}
static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
{
struct dlm_ctxt *dlm;
dlm = mle->dlm;
spin_lock(&dlm->spinlock);
spin_lock(&dlm->master_lock);
mle->inuse--;
__dlm_put_mle(mle);
spin_unlock(&dlm->master_lock);
spin_unlock(&dlm->spinlock);
}
/* remove from list and free */ /* remove from list and free */
static void __dlm_put_mle(struct dlm_master_list_entry *mle) static void __dlm_put_mle(struct dlm_master_list_entry *mle)
{ {
...@@ -390,6 +416,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle, ...@@ -390,6 +416,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle,
memset(mle->response_map, 0, sizeof(mle->response_map)); memset(mle->response_map, 0, sizeof(mle->response_map));
mle->master = O2NM_MAX_NODES; mle->master = O2NM_MAX_NODES;
mle->new_master = O2NM_MAX_NODES; mle->new_master = O2NM_MAX_NODES;
mle->inuse = 0;
if (mle->type == DLM_MLE_MASTER) { if (mle->type == DLM_MLE_MASTER) {
BUG_ON(!res); BUG_ON(!res);
...@@ -809,7 +836,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, ...@@ -809,7 +836,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
* if so, the creator of the BLOCK may try to put the last * if so, the creator of the BLOCK may try to put the last
* ref at this time in the assert master handler, so we * ref at this time in the assert master handler, so we
* need an extra one to keep from a bad ptr deref. */ * need an extra one to keep from a bad ptr deref. */
dlm_get_mle(mle); dlm_get_mle_inuse(mle);
spin_unlock(&dlm->master_lock); spin_unlock(&dlm->master_lock);
spin_unlock(&dlm->spinlock); spin_unlock(&dlm->spinlock);
...@@ -899,7 +926,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, ...@@ -899,7 +926,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
dlm_mle_detach_hb_events(dlm, mle); dlm_mle_detach_hb_events(dlm, mle);
dlm_put_mle(mle); dlm_put_mle(mle);
/* put the extra ref */ /* put the extra ref */
dlm_put_mle(mle); dlm_put_mle_inuse(mle);
wake_waiters: wake_waiters:
spin_lock(&res->spinlock); spin_lock(&res->spinlock);
...@@ -1753,6 +1780,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) ...@@ -1753,6 +1780,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
if (mle) { if (mle) {
int extra_ref = 0; int extra_ref = 0;
int nn = -1; int nn = -1;
int rr, err = 0;
spin_lock(&mle->spinlock); spin_lock(&mle->spinlock);
if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION) if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
...@@ -1772,27 +1800,64 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) ...@@ -1772,27 +1800,64 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
wake_up(&mle->wq); wake_up(&mle->wq);
spin_unlock(&mle->spinlock); spin_unlock(&mle->spinlock);
if (mle->type == DLM_MLE_MIGRATION && res) { if (res) {
spin_lock(&res->spinlock);
if (mle->type == DLM_MLE_MIGRATION) {
mlog(0, "finishing off migration of lockres %.*s, " mlog(0, "finishing off migration of lockres %.*s, "
"from %u to %u\n", "from %u to %u\n",
res->lockname.len, res->lockname.name, res->lockname.len, res->lockname.name,
dlm->node_num, mle->new_master); dlm->node_num, mle->new_master);
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_MIGRATING; res->state &= ~DLM_LOCK_RES_MIGRATING;
dlm_change_lockres_owner(dlm, res, mle->new_master); dlm_change_lockres_owner(dlm, res, mle->new_master);
BUG_ON(res->state & DLM_LOCK_RES_DIRTY); BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
} else {
dlm_change_lockres_owner(dlm, res, mle->master);
}
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
} }
/* master is known, detach if not already detached */
dlm_mle_detach_hb_events(dlm, mle);
dlm_put_mle(mle);
/* master is known, detach if not already detached.
* ensures that only one assert_master call will happen
* on this mle. */
spin_lock(&dlm->spinlock);
spin_lock(&dlm->master_lock);
rr = atomic_read(&mle->mle_refs.refcount);
if (mle->inuse > 0) {
if (extra_ref && rr < 3)
err = 1;
else if (!extra_ref && rr < 2)
err = 1;
} else {
if (extra_ref && rr < 2)
err = 1;
else if (!extra_ref && rr < 1)
err = 1;
}
if (err) {
mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
"that will mess up this node, refs=%d, extra=%d, "
"inuse=%d\n", dlm->name, namelen, name,
assert->node_idx, rr, extra_ref, mle->inuse);
dlm_print_one_mle(mle);
}
list_del_init(&mle->list);
__dlm_mle_detach_hb_events(dlm, mle);
__dlm_put_mle(mle);
if (extra_ref) { if (extra_ref) {
/* the assert master message now balances the extra /* the assert master message now balances the extra
* ref given by the master / migration request message. * ref given by the master / migration request message.
* if this is the last put, it will be removed * if this is the last put, it will be removed
* from the list. */ * from the list. */
dlm_put_mle(mle); __dlm_put_mle(mle);
}
spin_unlock(&dlm->master_lock);
spin_unlock(&dlm->spinlock);
} else if (res) {
if (res->owner != assert->node_idx) {
mlog(0, "assert_master from %u, but current "
"owner is %u (%.*s), no mle\n", assert->node_idx,
res->owner, namelen, name);
} }
} }
...@@ -2138,7 +2203,7 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, ...@@ -2138,7 +2203,7 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
* take both dlm->spinlock and dlm->master_lock */ * take both dlm->spinlock and dlm->master_lock */
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
spin_lock(&dlm->master_lock); spin_lock(&dlm->master_lock);
dlm_get_mle(mle); dlm_get_mle_inuse(mle);
spin_unlock(&dlm->master_lock); spin_unlock(&dlm->master_lock);
spin_unlock(&dlm->spinlock); spin_unlock(&dlm->spinlock);
...@@ -2155,7 +2220,10 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, ...@@ -2155,7 +2220,10 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
/* migration failed, detach and clean up mle */ /* migration failed, detach and clean up mle */
dlm_mle_detach_hb_events(dlm, mle); dlm_mle_detach_hb_events(dlm, mle);
dlm_put_mle(mle); dlm_put_mle(mle);
dlm_put_mle(mle); dlm_put_mle_inuse(mle);
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_MIGRATING;
spin_unlock(&res->spinlock);
goto leave; goto leave;
} }
...@@ -2196,7 +2264,10 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, ...@@ -2196,7 +2264,10 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
/* migration failed, detach and clean up mle */ /* migration failed, detach and clean up mle */
dlm_mle_detach_hb_events(dlm, mle); dlm_mle_detach_hb_events(dlm, mle);
dlm_put_mle(mle); dlm_put_mle(mle);
dlm_put_mle(mle); dlm_put_mle_inuse(mle);
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_MIGRATING;
spin_unlock(&res->spinlock);
goto leave; goto leave;
} }
/* TODO: if node died: stop, clean up, return error */ /* TODO: if node died: stop, clean up, return error */
...@@ -2212,7 +2283,7 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, ...@@ -2212,7 +2283,7 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
/* master is known, detach if not already detached */ /* master is known, detach if not already detached */
dlm_mle_detach_hb_events(dlm, mle); dlm_mle_detach_hb_events(dlm, mle);
dlm_put_mle(mle); dlm_put_mle_inuse(mle);
ret = 0; ret = 0;
dlm_lockres_calc_usage(dlm, res); dlm_lockres_calc_usage(dlm, res);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment