Commit c994c2eb authored by Goldwyn Rodrigues's avatar Goldwyn Rodrigues Committed by Linus Torvalds

ocfs2: use the new DLM operation callbacks while requesting new lockspace

Attempt to use the new DLM operations.  If it is not supported, use the
traditional ocfs2_controld.

To exchange ocfs2 versioning, we use the LVB of the version dlm lock.
It first attempts to take the lock in EX mode (non-blocking).  If
successful (which means it is the first mount), it writes the version
number and downconverts to PR lock.  If it is unsuccessful, it reads the
version from the lock.

If this becomes the standard (with o2cb as well), it could simplify
userspace tools to check if the filesystem is mounted on other nodes.

Dan: Since ocfs2_protocol_version are two u8 values, the additional
checks with LONG* don't make sense.
Signed-off-by: default avatarGoldwyn Rodrigues <rgoldwyn@suse.com>
Signed-off-by: default avatarDan Carpenter <dan.carpenter@oracle.com>
Reviewed-by: default avatarMark Fasheh <mfasheh@suse.de>
Cc: Joel Becker <jlbec@evilplan.org>
Cc: Dan Carpenter <dan.carpenter@oracle.com>
Signed-off-by: default avatarAndrew Morton <akpm@linux-foundation.org>
Signed-off-by: default avatarLinus Torvalds <torvalds@linux-foundation.org>
parent 41503630
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <linux/mutex.h> #include <linux/mutex.h>
#include <linux/slab.h> #include <linux/slab.h>
#include <linux/reboot.h> #include <linux/reboot.h>
#include <linux/sched.h>
#include <asm/uaccess.h> #include <asm/uaccess.h>
#include "stackglue.h" #include "stackglue.h"
...@@ -122,6 +123,7 @@ struct ocfs2_live_connection { ...@@ -122,6 +123,7 @@ struct ocfs2_live_connection {
struct dlm_lksb oc_version_lksb; struct dlm_lksb oc_version_lksb;
char oc_lvb[DLM_LVB_LEN]; char oc_lvb[DLM_LVB_LEN];
struct completion oc_sync_wait; struct completion oc_sync_wait;
wait_queue_head_t oc_wait;
}; };
struct ocfs2_control_private { struct ocfs2_control_private {
...@@ -218,7 +220,7 @@ static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn, ...@@ -218,7 +220,7 @@ static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn,
mutex_lock(&ocfs2_control_lock); mutex_lock(&ocfs2_control_lock);
c->oc_conn = conn; c->oc_conn = conn;
if (atomic_read(&ocfs2_control_opened)) if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened))
list_add(&c->oc_list, &ocfs2_live_connection_list); list_add(&c->oc_list, &ocfs2_live_connection_list);
else { else {
printk(KERN_ERR printk(KERN_ERR
...@@ -897,6 +899,55 @@ static int version_unlock(struct ocfs2_cluster_connection *conn) ...@@ -897,6 +899,55 @@ static int version_unlock(struct ocfs2_cluster_connection *conn)
return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK); return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK);
} }
/* get_protocol_version()
*
* To exchange ocfs2 versioning, we use the LVB of the version dlm lock.
* The algorithm is:
* 1. Attempt to take the lock in EX mode (non-blocking).
* 2. If successful (which means it is the first mount), write the
* version number and downconvert to PR lock.
* 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after
* taking the PR lock.
*/
static int get_protocol_version(struct ocfs2_cluster_connection *conn)
{
int ret;
struct ocfs2_live_connection *lc = conn->cc_private;
struct ocfs2_protocol_version pv;
running_proto.pv_major =
ocfs2_user_plugin.sp_max_proto.pv_major;
running_proto.pv_minor =
ocfs2_user_plugin.sp_max_proto.pv_minor;
lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb;
ret = version_lock(conn, DLM_LOCK_EX,
DLM_LKF_VALBLK|DLM_LKF_NOQUEUE);
if (!ret) {
conn->cc_version.pv_major = running_proto.pv_major;
conn->cc_version.pv_minor = running_proto.pv_minor;
version_to_lvb(&running_proto, lc->oc_lvb);
version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
} else if (ret == -EAGAIN) {
ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK);
if (ret)
goto out;
lvb_to_version(lc->oc_lvb, &pv);
if ((pv.pv_major != running_proto.pv_major) ||
(pv.pv_minor > running_proto.pv_minor)) {
ret = -EINVAL;
goto out;
}
conn->cc_version.pv_major = pv.pv_major;
conn->cc_version.pv_minor = pv.pv_minor;
}
out:
return ret;
}
static void user_recover_prep(void *arg) static void user_recover_prep(void *arg)
{ {
} }
...@@ -925,6 +976,7 @@ static void user_recover_done(void *arg, struct dlm_slot *slots, ...@@ -925,6 +976,7 @@ static void user_recover_done(void *arg, struct dlm_slot *slots,
} }
lc->oc_our_slot = our_slot; lc->oc_our_slot = our_slot;
wake_up(&lc->oc_wait);
} }
const struct dlm_lockspace_ops ocfs2_ls_ops = { const struct dlm_lockspace_ops ocfs2_ls_ops = {
...@@ -933,11 +985,21 @@ const struct dlm_lockspace_ops ocfs2_ls_ops = { ...@@ -933,11 +985,21 @@ const struct dlm_lockspace_ops ocfs2_ls_ops = {
.recover_done = user_recover_done, .recover_done = user_recover_done,
}; };
static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
{
version_unlock(conn);
dlm_release_lockspace(conn->cc_lockspace, 2);
conn->cc_lockspace = NULL;
ocfs2_live_connection_drop(conn->cc_private);
conn->cc_private = NULL;
return 0;
}
static int user_cluster_connect(struct ocfs2_cluster_connection *conn) static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
{ {
dlm_lockspace_t *fsdlm; dlm_lockspace_t *fsdlm;
struct ocfs2_live_connection *lc; struct ocfs2_live_connection *lc;
int rc; int rc, ops_rv;
BUG_ON(conn == NULL); BUG_ON(conn == NULL);
...@@ -947,11 +1009,44 @@ static int user_cluster_connect(struct ocfs2_cluster_connection *conn) ...@@ -947,11 +1009,44 @@ static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
goto out; goto out;
} }
lc->oc_type = WITH_CONTROLD; init_waitqueue_head(&lc->oc_wait);
init_completion(&lc->oc_sync_wait);
atomic_set(&lc->oc_this_node, 0);
conn->cc_private = lc;
lc->oc_type = NO_CONTROLD;
rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name,
DLM_LSFL_FS, DLM_LVB_LEN,
&ocfs2_ls_ops, conn, &ops_rv, &fsdlm);
if (rc)
goto out;
if (ops_rv == -EOPNOTSUPP) {
lc->oc_type = WITH_CONTROLD;
printk(KERN_NOTICE "ocfs2: You seem to be using an older "
"version of dlm_controld and/or ocfs2-tools."
" Please consider upgrading.\n");
} else if (ops_rv) {
rc = ops_rv;
goto out;
}
conn->cc_lockspace = fsdlm;
rc = ocfs2_live_connection_attach(conn, lc); rc = ocfs2_live_connection_attach(conn, lc);
if (rc) if (rc)
goto out; goto out;
if (lc->oc_type == NO_CONTROLD) {
rc = get_protocol_version(conn);
if (rc) {
printk(KERN_ERR "ocfs2: Could not determine"
" locking version\n");
user_cluster_disconnect(conn);
goto out;
}
wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0));
}
/* /*
* running_proto must have been set before we allowed any mounts * running_proto must have been set before we allowed any mounts
* to proceed. * to proceed.
...@@ -959,40 +1054,20 @@ static int user_cluster_connect(struct ocfs2_cluster_connection *conn) ...@@ -959,40 +1054,20 @@ static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
if (fs_protocol_compare(&running_proto, &conn->cc_version)) { if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
printk(KERN_ERR printk(KERN_ERR
"Unable to mount with fs locking protocol version " "Unable to mount with fs locking protocol version "
"%u.%u because the userspace control daemon has " "%u.%u because negotiated protocol is %u.%u\n",
"negotiated %u.%u\n",
conn->cc_version.pv_major, conn->cc_version.pv_minor, conn->cc_version.pv_major, conn->cc_version.pv_minor,
running_proto.pv_major, running_proto.pv_minor); running_proto.pv_major, running_proto.pv_minor);
rc = -EPROTO; rc = -EPROTO;
ocfs2_live_connection_drop(lc); ocfs2_live_connection_drop(lc);
lc = NULL; lc = NULL;
goto out;
}
rc = dlm_new_lockspace(conn->cc_name, NULL, DLM_LSFL_FS, DLM_LVB_LEN,
NULL, NULL, NULL, &fsdlm);
if (rc) {
ocfs2_live_connection_drop(lc);
lc = NULL;
goto out;
} }
conn->cc_private = lc;
conn->cc_lockspace = fsdlm;
out: out:
if (rc && lc) if (rc && lc)
kfree(lc); kfree(lc);
return rc; return rc;
} }
static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
{
dlm_release_lockspace(conn->cc_lockspace, 2);
conn->cc_lockspace = NULL;
ocfs2_live_connection_drop(conn->cc_private);
conn->cc_private = NULL;
return 0;
}
static int user_cluster_this_node(struct ocfs2_cluster_connection *conn, static int user_cluster_this_node(struct ocfs2_cluster_connection *conn,
unsigned int *this_node) unsigned int *this_node)
...@@ -1002,8 +1077,11 @@ static int user_cluster_this_node(struct ocfs2_cluster_connection *conn, ...@@ -1002,8 +1077,11 @@ static int user_cluster_this_node(struct ocfs2_cluster_connection *conn,
if (lc->oc_type == WITH_CONTROLD) if (lc->oc_type == WITH_CONTROLD)
rc = ocfs2_control_get_this_node(); rc = ocfs2_control_get_this_node();
else if (lc->oc_type == NO_CONTROLD)
rc = atomic_read(&lc->oc_this_node);
else else
rc = -EINVAL; rc = -EINVAL;
if (rc < 0) if (rc < 0)
return rc; return rc;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment