Commit fff42f21 authored by Heming Zhao's avatar Heming Zhao Committed by Song Liu

md-cluster: fix hanging issue while a new disk adding

The commit 1bbe254e ("md-cluster: check for timeout while a
new disk adding") is correct in terms of code syntax but not
suite real clustered code logic.

When a timeout occurs while adding a new disk, if recv_daemon()
bypasses the unlock for ack_lockres:CR, another node will be waiting
to grab EX lock. This will cause the cluster to hang indefinitely.

How to fix:

1. In dlm_lock_sync(), change the wait behaviour from forever to a
   timeout, This could avoid the hanging issue when another node
   fails to handle cluster msg. Another result of this change is
   that if another node receives an unknown msg (e.g. a new msg_type),
   the old code will hang, whereas the new code will timeout and fail.
   This could help cluster_md handle new msg_type from different
   nodes with different kernel/module versions (e.g. The user only
   updates one leg's kernel and monitors the stability of the new
   kernel).
2. The old code for __sendmsg() always returns 0 (success) under the
   design (must successfully unlock ->message_lockres). This commit
   makes this function return an error number when an error occurs.

Fixes: 1bbe254e ("md-cluster: check for timeout while a new disk adding")
Signed-off-by: default avatarHeming Zhao <heming.zhao@suse.com>
Reviewed-by: default avatarSu Yue <glass.su@suse.com>
Acked-by: default avatarYu Kuai <yukuai3@huawei.com>
Signed-off-by: default avatarSong Liu <song@kernel.org>
Link: https://lore.kernel.org/r/20240709104120.22243-1-heming.zhao@suse.com
parent 3c1743a6
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define LVB_SIZE 64 #define LVB_SIZE 64
#define NEW_DEV_TIMEOUT 5000 #define NEW_DEV_TIMEOUT 5000
#define WAIT_DLM_LOCK_TIMEOUT (30 * HZ)
struct dlm_lock_resource { struct dlm_lock_resource {
dlm_lockspace_t *ls; dlm_lockspace_t *ls;
...@@ -130,8 +131,13 @@ static int dlm_lock_sync(struct dlm_lock_resource *res, int mode) ...@@ -130,8 +131,13 @@ static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
0, sync_ast, res, res->bast); 0, sync_ast, res, res->bast);
if (ret) if (ret)
return ret; return ret;
wait_event(res->sync_locking, res->sync_locking_done); ret = wait_event_timeout(res->sync_locking, res->sync_locking_done,
WAIT_DLM_LOCK_TIMEOUT);
res->sync_locking_done = false; res->sync_locking_done = false;
if (!ret) {
pr_err("locking DLM '%s' timeout!\n", res->name);
return -EBUSY;
}
if (res->lksb.sb_status == 0) if (res->lksb.sb_status == 0)
res->mode = mode; res->mode = mode;
return res->lksb.sb_status; return res->lksb.sb_status;
...@@ -743,7 +749,7 @@ static void unlock_comm(struct md_cluster_info *cinfo) ...@@ -743,7 +749,7 @@ static void unlock_comm(struct md_cluster_info *cinfo)
*/ */
static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
{ {
int error; int error, unlock_error;
int slot = cinfo->slot_number - 1; int slot = cinfo->slot_number - 1;
cmsg->slot = cpu_to_le32(slot); cmsg->slot = cpu_to_le32(slot);
...@@ -751,7 +757,7 @@ static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) ...@@ -751,7 +757,7 @@ static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX); error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
if (error) { if (error) {
pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error); pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
goto failed_message; return error;
} }
memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg, memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
...@@ -781,14 +787,10 @@ static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) ...@@ -781,14 +787,10 @@ static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
} }
failed_ack: failed_ack:
error = dlm_unlock_sync(cinfo->message_lockres); while ((unlock_error = dlm_unlock_sync(cinfo->message_lockres)))
if (unlikely(error != 0)) {
pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n", pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
error); unlock_error);
/* in case the message can't be released due to some reason */
goto failed_ack;
}
failed_message:
return error; return error;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment