Commit aa4e3c8a authored by Mikhail Pershin's avatar Mikhail Pershin Committed by Greg Kroah-Hartman

staging/lustre/llog: MGC to use OSD API for backup logs

MGC uses lvfs API to access local llogs blocking removal of old code

- llog_is_empty() and llog_backup() are introduced

Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-2059
Lustre-change: http://review.whamcloud.com/5049
Cc: Levente Kurusa <levex@linux.com>
Signed-off-by: default avatarMikhail Pershin <mike.pershin@intel.com>
Reviewed-by: default avatarLai Siyao <lai.siyao@intel.com>
Reviewed-by: default avatarAlex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: default avatarJames Simmons <uja.ornl@gmail.com>
Reviewed-by: default avatarOleg Drokin <oleg.drokin@intel.com>
[pick client side change only -- Peng Tao]
Signed-off-by: default avatarPeng Tao <bergwolf@gmail.com>
Signed-off-by: default avatarAndreas Dilger <andreas.dilger@intel.com>
Signed-off-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
parent 8cc93bc3
......@@ -692,7 +692,7 @@ struct local_oid_storage {
struct dt_object *los_obj;
/* data used to generate new fids */
struct mutex los_id_lock;
struct mutex los_id_lock;
__u64 los_seq;
__u32 los_last_oid;
};
......
......@@ -136,7 +136,11 @@ int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
struct llog_handle **lgh, struct llog_logid *logid,
char *name, enum llog_open_param open_param);
int llog_close(const struct lu_env *env, struct llog_handle *cathandle);
int llog_get_size(struct llog_handle *loghandle);
int llog_is_empty(const struct lu_env *env, struct llog_ctxt *ctxt,
char *name);
int llog_backup(const struct lu_env *env, struct obd_device *obd,
struct llog_ctxt *ctxt, struct llog_ctxt *bak_ctxt,
char *name, char *backup);
/* llog_process flags */
#define LLOG_FLAG_NODEAMON 0x0001
......@@ -382,6 +386,13 @@ static inline int llog_data_len(int len)
return cfs_size_round(len);
}
static inline int llog_get_size(struct llog_handle *loghandle)
{
if (loghandle && loghandle->lgh_hdr)
return loghandle->lgh_hdr->llh_count;
return 0;
}
static inline struct llog_ctxt *llog_ctxt_get(struct llog_ctxt *ctxt)
{
atomic_inc(&ctxt->loc_refcount);
......
......@@ -399,8 +399,8 @@ struct client_obd {
/* mgc datastruct */
struct semaphore cl_mgc_sem;
struct vfsmount *cl_mgc_vfsmnt;
struct dentry *cl_mgc_configs_dir;
struct local_oid_storage *cl_mgc_los;
struct dt_object *cl_mgc_configs_dir;
atomic_t cl_mgc_refcount;
struct obd_export *cl_mgc_mgsexp;
......
......@@ -99,11 +99,8 @@ static int mgc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
static int mgc_cleanup(struct obd_device *obd)
{
struct client_obd *cli = &obd->u.cli;
int rc;
LASSERT(cli->cl_mgc_vfsmnt == NULL);
ptlrpcd_decref();
rc = client_obd_cleanup(obd);
......
......@@ -265,31 +265,6 @@ int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
}
EXPORT_SYMBOL(llog_init_handle);
int llog_copy_handler(const struct lu_env *env,
struct llog_handle *llh,
struct llog_rec_hdr *rec,
void *data)
{
struct llog_rec_hdr local_rec = *rec;
struct llog_handle *local_llh = (struct llog_handle *)data;
char *cfg_buf = (char*) (rec + 1);
struct lustre_cfg *lcfg;
int rc = 0;
/* Append all records */
local_rec.lrh_len -= sizeof(*rec) + sizeof(struct llog_rec_tail);
rc = llog_write(env, local_llh, &local_rec, NULL, 0,
(void *)cfg_buf, -1);
lcfg = (struct lustre_cfg *)cfg_buf;
CDEBUG(D_INFO, "idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
return rc;
}
EXPORT_SYMBOL(llog_copy_handler);
static int llog_process_thread(void *arg)
{
struct llog_process_info *lpi = arg;
......@@ -493,14 +468,6 @@ int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
}
EXPORT_SYMBOL(llog_process);
inline int llog_get_size(struct llog_handle *loghandle)
{
if (loghandle && loghandle->lgh_hdr)
return loghandle->lgh_hdr->llh_count;
return 0;
}
EXPORT_SYMBOL(llog_get_size);
int llog_reverse_process(const struct lu_env *env,
struct llog_handle *loghandle, llog_cb_t cb,
void *data, void *catdata)
......@@ -767,8 +734,9 @@ int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
struct llog_handle **res, struct llog_logid *logid,
char *name)
{
struct thandle *th;
int rc;
struct dt_device *d;
struct thandle *th;
int rc;
rc = llog_open(env, ctxt, res, logid, name, LLOG_OPEN_NEW);
if (rc)
......@@ -777,27 +745,21 @@ int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
if (llog_exist(*res))
return 0;
if ((*res)->lgh_obj != NULL) {
struct dt_device *d;
LASSERT((*res)->lgh_obj != NULL);
d = lu2dt_dev((*res)->lgh_obj->do_lu.lo_dev);
d = lu2dt_dev((*res)->lgh_obj->do_lu.lo_dev);
th = dt_trans_create(env, d);
if (IS_ERR(th))
GOTO(out, rc = PTR_ERR(th));
th = dt_trans_create(env, d);
if (IS_ERR(th))
GOTO(out, rc = PTR_ERR(th));
rc = llog_declare_create(env, *res, th);
if (rc == 0) {
rc = dt_trans_start_local(env, d, th);
if (rc == 0)
rc = llog_create(env, *res, th);
}
dt_trans_stop(env, d, th);
} else {
/* lvfs compat code */
LASSERT((*res)->lgh_file == NULL);
rc = llog_create(env, *res, NULL);
rc = llog_declare_create(env, *res, th);
if (rc == 0) {
rc = dt_trans_start_local(env, d, th);
if (rc == 0)
rc = llog_create(env, *res, th);
}
dt_trans_stop(env, d, th);
out:
if (rc)
llog_close(env, *res);
......@@ -842,41 +804,34 @@ int llog_write(const struct lu_env *env, struct llog_handle *loghandle,
struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
int cookiecount, void *buf, int idx)
{
int rc;
struct dt_device *dt;
struct thandle *th;
int rc;
LASSERT(loghandle);
LASSERT(loghandle->lgh_ctxt);
LASSERT(loghandle->lgh_obj != NULL);
if (loghandle->lgh_obj != NULL) {
struct dt_device *dt;
struct thandle *th;
dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
th = dt_trans_create(env, dt);
if (IS_ERR(th))
return PTR_ERR(th);
th = dt_trans_create(env, dt);
if (IS_ERR(th))
return PTR_ERR(th);
rc = llog_declare_write_rec(env, loghandle, rec, idx, th);
if (rc)
GOTO(out_trans, rc);
rc = llog_declare_write_rec(env, loghandle, rec, idx, th);
if (rc)
GOTO(out_trans, rc);
rc = dt_trans_start_local(env, dt, th);
if (rc)
GOTO(out_trans, rc);
rc = dt_trans_start_local(env, dt, th);
if (rc)
GOTO(out_trans, rc);
down_write(&loghandle->lgh_lock);
rc = llog_write_rec(env, loghandle, rec, reccookie,
cookiecount, buf, idx, th);
up_write(&loghandle->lgh_lock);
down_write(&loghandle->lgh_lock);
rc = llog_write_rec(env, loghandle, rec, reccookie,
cookiecount, buf, idx, th);
up_write(&loghandle->lgh_lock);
out_trans:
dt_trans_stop(env, dt, th);
} else { /* lvfs compatibility */
down_write(&loghandle->lgh_lock);
rc = llog_write_rec(env, loghandle, rec, reccookie,
cookiecount, buf, idx, NULL);
up_write(&loghandle->lgh_lock);
}
dt_trans_stop(env, dt, th);
return rc;
}
EXPORT_SYMBOL(llog_write);
......@@ -932,3 +887,104 @@ int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
return rc;
}
EXPORT_SYMBOL(llog_close);
int llog_is_empty(const struct lu_env *env, struct llog_ctxt *ctxt,
char *name)
{
struct llog_handle *llh;
int rc = 0;
rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
if (rc < 0) {
if (likely(rc == -ENOENT))
rc = 0;
GOTO(out, rc);
}
rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
if (rc)
GOTO(out_close, rc);
rc = llog_get_size(llh);
out_close:
llog_close(env, llh);
out:
/* header is record 1 */
return rc <= 1;
}
EXPORT_SYMBOL(llog_is_empty);
int llog_copy_handler(const struct lu_env *env, struct llog_handle *llh,
struct llog_rec_hdr *rec, void *data)
{
struct llog_handle *copy_llh = data;
/* Append all records */
return llog_write(env, copy_llh, rec, NULL, 0, NULL, -1);
}
EXPORT_SYMBOL(llog_copy_handler);
/* backup plain llog */
int llog_backup(const struct lu_env *env, struct obd_device *obd,
struct llog_ctxt *ctxt, struct llog_ctxt *bctxt,
char *name, char *backup)
{
struct llog_handle *llh, *bllh;
int rc;
/* open original log */
rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
if (rc < 0) {
/* the -ENOENT case is also reported to the caller
* but silently so it should handle that if needed.
*/
if (rc != -ENOENT)
CERROR("%s: failed to open log %s: rc = %d\n",
obd->obd_name, name, rc);
return rc;
}
rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
if (rc)
GOTO(out_close, rc);
/* Make sure there's no old backup log */
rc = llog_erase(env, bctxt, NULL, backup);
if (rc < 0 && rc != -ENOENT)
GOTO(out_close, rc);
/* open backup log */
rc = llog_open_create(env, bctxt, &bllh, NULL, backup);
if (rc) {
CERROR("%s: failed to open backup logfile %s: rc = %d\n",
obd->obd_name, backup, rc);
GOTO(out_close, rc);
}
/* check that backup llog is not the same object as original one */
if (llh->lgh_obj == bllh->lgh_obj) {
CERROR("%s: backup llog %s to itself (%s), objects %p/%p\n",
obd->obd_name, name, backup, llh->lgh_obj,
bllh->lgh_obj);
GOTO(out_backup, rc = -EEXIST);
}
rc = llog_init_handle(env, bllh, LLOG_F_IS_PLAIN, NULL);
if (rc)
GOTO(out_backup, rc);
/* Copy log record by record */
rc = llog_process_or_fork(env, llh, llog_copy_handler, (void *)bllh,
NULL, false);
if (rc)
CERROR("%s: failed to backup log %s: rc = %d\n",
obd->obd_name, name, rc);
out_backup:
llog_close(env, bllh);
out_close:
llog_close(env, llh);
return rc;
}
EXPORT_SYMBOL(llog_backup);
......@@ -855,9 +855,12 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
(*los)->los_seq = fid_seq(first_fid);
(*los)->los_last_oid = le64_to_cpu(lastid);
(*los)->los_obj = o;
/* read value should not be less than initial one */
LASSERTF((*los)->los_last_oid >= first_oid, "%u < %u\n",
(*los)->los_last_oid, first_oid);
/* Read value should not be less than initial one
* but possible after upgrade from older fs.
* In this case just switch to the first_oid in memory and
* it will be updated on disk with first object generated */
if ((*los)->los_last_oid < first_oid)
(*los)->los_last_oid = first_oid;
}
out:
mutex_unlock(&ls->ls_los_mutex);
......
......@@ -29,6 +29,8 @@
*
* Author: Mikhail Pershin <mike.pershin@intel.com>
*/
#ifndef __LOCAL_STORAGE_H
#define __LOCAL_STORAGE_H
#include <dt_object.h>
#include <obd.h>
......@@ -86,3 +88,4 @@ struct los_ondisk {
};
#define LOS_MAGIC 0xdecafbee
#endif
......@@ -429,7 +429,7 @@ LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
*/
struct lu_context_key lu_global_key = {
.lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
LCT_MG_THREAD | LCT_CL_THREAD,
LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL,
.lct_init = lu_global_key_init,
.lct_fini = lu_global_key_fini
};
......
......@@ -631,6 +631,9 @@ int lustre_put_lsi(struct super_block *sb)
CDEBUG(D_MOUNT, "put %p %d\n", sb, atomic_read(&lsi->lsi_mounts));
if (atomic_dec_and_test(&lsi->lsi_mounts)) {
if (IS_SERVER(lsi) && lsi->lsi_osd_exp) {
lu_device_put(&lsi->lsi_dt_dev->dd_lu_dev);
lsi->lsi_osd_exp->exp_obd->obd_lvfs_ctxt.dt = NULL;
lsi->lsi_dt_dev = NULL;
obd_disconnect(lsi->lsi_osd_exp);
/* wait till OSD is gone */
obd_zombie_barrier();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment