Commit 8dfb790b authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-4.9-rc1' of git://github.com/ceph/ceph-client

Pull Ceph updates from Ilya Dryomov:
 "The big ticket item here is support for rbd exclusive-lock feature,
  with maintenance operations offloaded to userspace (Douglas Fuller,
  Mike Christie and myself). Another block device bullet is a series
  fixing up layering error paths (myself).

  On the filesystem side, we've got patches that improve our handling of
  buffered vs dio write races (Neil Brown) and a few assorted fixes from
  Zheng. Also included a couple of random cleanups and a minor CRUSH
  update"

* tag 'ceph-for-4.9-rc1' of git://github.com/ceph/ceph-client: (39 commits)
  crush: remove redundant local variable
  crush: don't normalize input of crush_ln iteratively
  libceph: ceph_build_auth() doesn't need ceph_auth_build_hello()
  libceph: use CEPH_AUTH_UNKNOWN in ceph_auth_build_hello()
  ceph: fix description for rsize and rasize mount options
  rbd: use kmalloc_array() in rbd_header_from_disk()
  ceph: use list_move instead of list_del/list_add
  ceph: handle CEPH_SESSION_REJECT message
  ceph: avoid accessing / when mounting a subpath
  ceph: fix mandatory flock check
  ceph: remove warning when ceph_releasepage() is called on dirty page
  ceph: ignore error from invalidate_inode_pages2_range() in direct write
  ceph: fix error handling of start_read()
  rbd: add rbd_obj_request_error() helper
  rbd: img_data requests don't own their page array
  rbd: don't call rbd_osd_req_format_read() for !img_data requests
  rbd: rework rbd_img_obj_exists_submit() error paths
  rbd: don't crash or leak on errors in rbd_img_obj_parent_read_full_callback()
  rbd: move bumping img_request refcount into rbd_obj_request_submit()
  rbd: mark the original request as done if stat request fails
  ...
parents fed41f7d 64f77566
......@@ -6,7 +6,7 @@ Description:
Being used for adding and removing rbd block devices.
Usage: <mon ip addr> <options> <pool name> <rbd image name> [snap name]
Usage: <mon ip addr> <options> <pool name> <rbd image name> [<snap name>]
$ echo "192.168.0.1 name=admin rbd foo" > /sys/bus/rbd/add
......@@ -14,9 +14,13 @@ The snapshot name can be "-" or omitted to map the image read/write. A <dev-id>
will be assigned for any registered block device. If snapshot is used, it will
be mapped read-only.
Removal of a device:
Usage: <dev-id> [force]
$ echo <dev-id> > /sys/bus/rbd/remove
$ echo 2 > /sys/bus/rbd/remove
Optional "force" argument which when passed will wait for running requests and
then unmap the image. Requests sent to the driver after initiating the removal
will be failed. (August 2016, since 4.9.)
What: /sys/bus/rbd/add_single_major
Date: December 2013
......@@ -43,10 +47,25 @@ Description: Available only if rbd module is inserted with single_major
Entries under /sys/bus/rbd/devices/<dev-id>/
--------------------------------------------
client_addr
The ceph unique client entity_addr_t (address + nonce).
The format is <address>:<port>/<nonce>: '1.2.3.4:1234/5678' or
'[1:2:3:4:5:6:7:8]:1234/5678'. (August 2016, since 4.9.)
client_id
The ceph unique client id that was assigned for this specific session.
cluster_fsid
The ceph cluster UUID. (August 2016, since 4.9.)
config_info
The string written into /sys/bus/rbd/add{,_single_major}. (August
2016, since 4.9.)
features
A hexadecimal encoding of the feature bits for this image.
......@@ -92,6 +111,10 @@ current_snap
The current snapshot for which the device is mapped.
snap_id
The current snapshot's id. (August 2016, since 4.9.)
parent
Information identifying the chain of parent images in a layered rbd
......
......@@ -98,6 +98,10 @@ Mount Options
size.
rsize=X
Specify the maximum read size in bytes. By default there is no
maximum.
rasize=X
Specify the maximum readahead.
mount_timeout=X
......
......@@ -31,6 +31,7 @@
#include <linux/ceph/libceph.h>
#include <linux/ceph/osd_client.h>
#include <linux/ceph/mon_client.h>
#include <linux/ceph/cls_lock_client.h>
#include <linux/ceph/decode.h>
#include <linux/parser.h>
#include <linux/bsearch.h>
......@@ -114,12 +115,17 @@ static int atomic_dec_return_safe(atomic_t *v)
#define RBD_OBJ_PREFIX_LEN_MAX 64
#define RBD_NOTIFY_TIMEOUT 5 /* seconds */
#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
/* Feature bits */
#define RBD_FEATURE_LAYERING (1<<0)
#define RBD_FEATURE_STRIPINGV2 (1<<1)
#define RBD_FEATURES_ALL \
(RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
RBD_FEATURE_STRIPINGV2 | \
RBD_FEATURE_EXCLUSIVE_LOCK)
/* Features supported by this (client software) implementation. */
......@@ -128,11 +134,8 @@ static int atomic_dec_return_safe(atomic_t *v)
/*
* An RBD device name will be "rbd#", where the "rbd" comes from
* RBD_DRV_NAME above, and # is a unique integer identifier.
* MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
* enough to hold all possible device names.
*/
#define DEV_NAME_LEN 32
#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
/*
* block device image metadata (in-memory version)
......@@ -322,6 +325,24 @@ struct rbd_img_request {
#define for_each_obj_request_safe(ireq, oreq, n) \
list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
enum rbd_watch_state {
RBD_WATCH_STATE_UNREGISTERED,
RBD_WATCH_STATE_REGISTERED,
RBD_WATCH_STATE_ERROR,
};
enum rbd_lock_state {
RBD_LOCK_STATE_UNLOCKED,
RBD_LOCK_STATE_LOCKED,
RBD_LOCK_STATE_RELEASING,
};
/* WatchNotify::ClientId */
struct rbd_client_id {
u64 gid;
u64 handle;
};
struct rbd_mapping {
u64 size;
u64 features;
......@@ -349,13 +370,29 @@ struct rbd_device {
unsigned long flags; /* possibly lock protected */
struct rbd_spec *spec;
struct rbd_options *opts;
char *config_info; /* add{,_single_major} string */
struct ceph_object_id header_oid;
struct ceph_object_locator header_oloc;
struct ceph_file_layout layout;
struct ceph_file_layout layout; /* used for all rbd requests */
struct mutex watch_mutex;
enum rbd_watch_state watch_state;
struct ceph_osd_linger_request *watch_handle;
u64 watch_cookie;
struct delayed_work watch_dwork;
struct rw_semaphore lock_rwsem;
enum rbd_lock_state lock_state;
struct rbd_client_id owner_cid;
struct work_struct acquired_lock_work;
struct work_struct released_lock_work;
struct delayed_work lock_dwork;
struct work_struct unlock_work;
wait_queue_head_t lock_waitq;
struct workqueue_struct *task_wq;
struct rbd_spec *parent_spec;
u64 parent_overlap;
......@@ -439,6 +476,29 @@ static int minor_to_rbd_dev_id(int minor)
return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
}
static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
{
return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
rbd_dev->spec->snap_id == CEPH_NOSNAP &&
!rbd_dev->mapping.read_only;
}
static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
{
return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
}
static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
{
bool is_lock_owner;
down_read(&rbd_dev->lock_rwsem);
is_lock_owner = __rbd_is_lock_owner(rbd_dev);
up_read(&rbd_dev->lock_rwsem);
return is_lock_owner;
}
static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
......@@ -735,6 +795,7 @@ enum {
/* string args above */
Opt_read_only,
Opt_read_write,
Opt_lock_on_read,
Opt_err
};
......@@ -746,16 +807,19 @@ static match_table_t rbd_opts_tokens = {
{Opt_read_only, "ro"}, /* Alternate spelling */
{Opt_read_write, "read_write"},
{Opt_read_write, "rw"}, /* Alternate spelling */
{Opt_lock_on_read, "lock_on_read"},
{Opt_err, NULL}
};
struct rbd_options {
int queue_depth;
bool read_only;
bool lock_on_read;
};
#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
#define RBD_READ_ONLY_DEFAULT false
#define RBD_LOCK_ON_READ_DEFAULT false
static int parse_rbd_opts_token(char *c, void *private)
{
......@@ -791,6 +855,9 @@ static int parse_rbd_opts_token(char *c, void *private)
case Opt_read_write:
rbd_opts->read_only = false;
break;
case Opt_lock_on_read:
rbd_opts->lock_on_read = true;
break;
default:
/* libceph prints "bad option" msg */
return -EINVAL;
......@@ -919,7 +986,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
char *snap_names = NULL;
u64 *snap_sizes = NULL;
u32 snap_count;
size_t size;
int ret = -ENOMEM;
u32 i;
......@@ -957,9 +1023,9 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
goto out_err;
/* ...as well as the array of their sizes. */
size = snap_count * sizeof (*header->snap_sizes);
snap_sizes = kmalloc(size, GFP_KERNEL);
snap_sizes = kmalloc_array(snap_count,
sizeof(*header->snap_sizes),
GFP_KERNEL);
if (!snap_sizes)
goto out_err;
......@@ -1551,11 +1617,18 @@ static bool obj_request_type_valid(enum obj_request_type type)
}
}
static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
struct rbd_obj_request *obj_request)
static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
{
dout("%s %p\n", __func__, obj_request);
return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
struct ceph_osd_request *osd_req = obj_request->osd_req;
dout("%s %p osd_req %p\n", __func__, obj_request, osd_req);
if (obj_request_img_data_test(obj_request)) {
WARN_ON(obj_request->callback != rbd_img_obj_callback);
rbd_img_request_get(obj_request->img_request);
}
ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
}
static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
......@@ -1745,6 +1818,22 @@ static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
complete_all(&obj_request->completion);
}
static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
{
obj_request->result = err;
obj_request->xferred = 0;
/*
* kludge - mirror rbd_obj_request_submit() to match a put in
* rbd_img_obj_callback()
*/
if (obj_request_img_data_test(obj_request)) {
WARN_ON(obj_request->callback != rbd_img_obj_callback);
rbd_img_request_get(obj_request->img_request);
}
obj_request_done_set(obj_request);
rbd_obj_request_complete(obj_request);
}
static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
{
struct rbd_img_request *img_request = NULL;
......@@ -1877,11 +1966,10 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
{
struct rbd_img_request *img_request = obj_request->img_request;
struct ceph_osd_request *osd_req = obj_request->osd_req;
if (img_request)
osd_req->r_snapid = img_request->snap_id;
rbd_assert(obj_request_img_data_test(obj_request));
osd_req->r_snapid = obj_request->img_request->snap_id;
}
static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
......@@ -2074,7 +2162,9 @@ static void rbd_obj_request_destroy(struct kref *kref)
bio_chain_put(obj_request->bio_list);
break;
case OBJ_REQUEST_PAGES:
if (obj_request->pages)
/* img_data requests don't own their page array */
if (obj_request->pages &&
!obj_request_img_data_test(obj_request))
ceph_release_page_vector(obj_request->pages,
obj_request->page_count);
break;
......@@ -2295,13 +2385,6 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
xferred = obj_request->length;
}
/* Image object requests don't own their page array */
if (obj_request->type == OBJ_REQUEST_PAGES) {
obj_request->pages = NULL;
obj_request->page_count = 0;
}
if (img_request_child_test(img_request)) {
rbd_assert(img_request->obj_request != NULL);
more = obj_request->which < img_request->obj_request_count - 1;
......@@ -2520,8 +2603,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
rbd_img_request_get(img_request);
img_offset += length;
resid -= length;
}
......@@ -2579,7 +2660,6 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
{
struct rbd_obj_request *orig_request;
struct ceph_osd_request *osd_req;
struct ceph_osd_client *osdc;
struct rbd_device *rbd_dev;
struct page **pages;
enum obj_operation_type op_type;
......@@ -2603,7 +2683,7 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
rbd_assert(obj_request_type_valid(orig_request->type));
img_result = img_request->result;
parent_length = img_request->length;
rbd_assert(parent_length == img_request->xferred);
rbd_assert(img_result || parent_length == img_request->xferred);
rbd_img_request_put(img_request);
rbd_assert(orig_request->img_request);
......@@ -2616,13 +2696,9 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
* and re-submit the original write request.
*/
if (!rbd_dev->parent_overlap) {
struct ceph_osd_client *osdc;
ceph_release_page_vector(pages, page_count);
osdc = &rbd_dev->rbd_client->client->osdc;
img_result = rbd_obj_request_submit(osdc, orig_request);
if (!img_result)
return;
rbd_obj_request_submit(orig_request);
return;
}
if (img_result)
......@@ -2656,17 +2732,12 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
/* All set, send it off. */
osdc = &rbd_dev->rbd_client->client->osdc;
img_result = rbd_obj_request_submit(osdc, orig_request);
if (!img_result)
return;
out_err:
/* Record the error code and complete the request */
rbd_obj_request_submit(orig_request);
return;
orig_request->result = img_result;
orig_request->xferred = 0;
obj_request_done_set(orig_request);
rbd_obj_request_complete(orig_request);
out_err:
ceph_release_page_vector(pages, page_count);
rbd_obj_request_error(orig_request, img_result);
}
/*
......@@ -2680,26 +2751,19 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
* When the read completes, this page array will be transferred to
* the original object request for the copyup operation.
*
* If an error occurs, record it as the result of the original
* object request and mark it done so it gets completed.
* If an error occurs, it is recorded as the result of the original
* object request in rbd_img_obj_exists_callback().
*/
static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
{
struct rbd_img_request *img_request = NULL;
struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
struct rbd_img_request *parent_request = NULL;
struct rbd_device *rbd_dev;
u64 img_offset;
u64 length;
struct page **pages = NULL;
u32 page_count;
int result;
rbd_assert(obj_request_img_data_test(obj_request));
rbd_assert(obj_request_type_valid(obj_request->type));
img_request = obj_request->img_request;
rbd_assert(img_request != NULL);
rbd_dev = img_request->rbd_dev;
rbd_assert(rbd_dev->parent != NULL);
/*
......@@ -2740,10 +2804,11 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
if (result)
goto out_err;
parent_request->copyup_pages = pages;
parent_request->copyup_page_count = page_count;
parent_request->callback = rbd_img_obj_parent_read_full_callback;
result = rbd_img_request_submit(parent_request);
if (!result)
return 0;
......@@ -2757,10 +2822,6 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
ceph_release_page_vector(pages, page_count);
if (parent_request)
rbd_img_request_put(parent_request);
obj_request->result = result;
obj_request->xferred = 0;
obj_request_done_set(obj_request);
return result;
}
......@@ -2793,17 +2854,13 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
/*
* If the overlap has become 0 (most likely because the
* image has been flattened) we need to free the pages
* and re-submit the original write request.
* image has been flattened) we need to re-submit the
* original request.
*/
rbd_dev = orig_request->img_request->rbd_dev;
if (!rbd_dev->parent_overlap) {
struct ceph_osd_client *osdc;
osdc = &rbd_dev->rbd_client->client->osdc;
result = rbd_obj_request_submit(osdc, orig_request);
if (!result)
return;
rbd_obj_request_submit(orig_request);
return;
}
/*
......@@ -2816,31 +2873,45 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
obj_request_existence_set(orig_request, true);
} else if (result == -ENOENT) {
obj_request_existence_set(orig_request, false);
} else if (result) {
orig_request->result = result;
goto out;
} else {
goto fail_orig_request;
}
/*
* Resubmit the original request now that we have recorded
* whether the target object exists.
*/
orig_request->result = rbd_img_obj_request_submit(orig_request);
out:
if (orig_request->result)
rbd_obj_request_complete(orig_request);
result = rbd_img_obj_request_submit(orig_request);
if (result)
goto fail_orig_request;
return;
fail_orig_request:
rbd_obj_request_error(orig_request, result);
}
static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
{
struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
struct rbd_obj_request *stat_request;
struct rbd_device *rbd_dev;
struct ceph_osd_client *osdc;
struct page **pages = NULL;
struct page **pages;
u32 page_count;
size_t size;
int ret;
stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
OBJ_REQUEST_PAGES);
if (!stat_request)
return -ENOMEM;
stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
stat_request);
if (!stat_request->osd_req) {
ret = -ENOMEM;
goto fail_stat_request;
}
/*
* The response data for a STAT call consists of:
* le64 length;
......@@ -2852,52 +2923,33 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
page_count = (u32)calc_pages_for(0, size);
pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
if (IS_ERR(pages))
return PTR_ERR(pages);
if (IS_ERR(pages)) {
ret = PTR_ERR(pages);
goto fail_stat_request;
}
ret = -ENOMEM;
stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
OBJ_REQUEST_PAGES);
if (!stat_request)
goto out;
osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
false, false);
rbd_obj_request_get(obj_request);
stat_request->obj_request = obj_request;
stat_request->pages = pages;
stat_request->page_count = page_count;
rbd_assert(obj_request->img_request);
rbd_dev = obj_request->img_request->rbd_dev;
stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
stat_request);
if (!stat_request->osd_req)
goto out;
stat_request->callback = rbd_img_obj_exists_callback;
osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
false, false);
rbd_osd_req_format_read(stat_request);
osdc = &rbd_dev->rbd_client->client->osdc;
ret = rbd_obj_request_submit(osdc, stat_request);
out:
if (ret)
rbd_obj_request_put(obj_request);
rbd_obj_request_submit(stat_request);
return 0;
fail_stat_request:
rbd_obj_request_put(stat_request);
return ret;
}
static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
{
struct rbd_img_request *img_request;
struct rbd_device *rbd_dev;
rbd_assert(obj_request_img_data_test(obj_request));
img_request = obj_request->img_request;
rbd_assert(img_request);
rbd_dev = img_request->rbd_dev;
struct rbd_img_request *img_request = obj_request->img_request;
struct rbd_device *rbd_dev = img_request->rbd_dev;
/* Reads */
if (!img_request_write_test(img_request) &&
......@@ -2936,14 +2988,13 @@ static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
{
if (img_obj_request_simple(obj_request)) {
struct rbd_device *rbd_dev;
struct ceph_osd_client *osdc;
rbd_dev = obj_request->img_request->rbd_dev;
osdc = &rbd_dev->rbd_client->client->osdc;
rbd_assert(obj_request_img_data_test(obj_request));
rbd_assert(obj_request_type_valid(obj_request->type));
rbd_assert(obj_request->img_request);
return rbd_obj_request_submit(osdc, obj_request);
if (img_obj_request_simple(obj_request)) {
rbd_obj_request_submit(obj_request);
return 0;
}
/*
......@@ -3006,12 +3057,8 @@ static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
rbd_assert(obj_request->img_request);
rbd_dev = obj_request->img_request->rbd_dev;
if (!rbd_dev->parent_overlap) {
struct ceph_osd_client *osdc;
osdc = &rbd_dev->rbd_client->client->osdc;
img_result = rbd_obj_request_submit(osdc, obj_request);
if (!img_result)
return;
rbd_obj_request_submit(obj_request);
return;
}
obj_request->result = img_result;
......@@ -3084,153 +3131,897 @@ static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
obj_request_done_set(obj_request);
}
static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev);
static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev);
static const struct rbd_client_id rbd_empty_cid;
static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
u64 notifier_id, void *data, size_t data_len)
static bool rbd_cid_equal(const struct rbd_client_id *lhs,
const struct rbd_client_id *rhs)
{
return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
}
static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
{
struct rbd_client_id cid;
mutex_lock(&rbd_dev->watch_mutex);
cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
cid.handle = rbd_dev->watch_cookie;
mutex_unlock(&rbd_dev->watch_mutex);
return cid;
}
/*
* lock_rwsem must be held for write
*/
static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
const struct rbd_client_id *cid)
{
dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
cid->gid, cid->handle);
rbd_dev->owner_cid = *cid; /* struct */
}
static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
{
mutex_lock(&rbd_dev->watch_mutex);
sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
mutex_unlock(&rbd_dev->watch_mutex);
}
/*
* lock_rwsem must be held for write
*/
static int rbd_lock(struct rbd_device *rbd_dev)
{
struct rbd_device *rbd_dev = arg;
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_client_id cid = rbd_get_cid(rbd_dev);
char cookie[32];
int ret;
dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev,
cookie, notify_id);
WARN_ON(__rbd_is_lock_owner(rbd_dev));
/*
* Until adequate refresh error handling is in place, there is
* not much we can do here, except warn.
*
* See http://tracker.ceph.com/issues/5040
*/
ret = rbd_dev_refresh(rbd_dev);
format_lock_cookie(rbd_dev, cookie);
ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
RBD_LOCK_TAG, "", 0);
if (ret)
rbd_warn(rbd_dev, "refresh failed: %d", ret);
return ret;
ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, notify_id, cookie,
NULL, 0);
if (ret)
rbd_warn(rbd_dev, "notify_ack ret %d", ret);
rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
rbd_set_owner_cid(rbd_dev, &cid);
queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
return 0;
}
static void rbd_watch_errcb(void *arg, u64 cookie, int err)
/*
* lock_rwsem must be held for write
*/
static int rbd_unlock(struct rbd_device *rbd_dev)
{
struct rbd_device *rbd_dev = arg;
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
char cookie[32];
int ret;
rbd_warn(rbd_dev, "encountered watch error: %d", err);
WARN_ON(!__rbd_is_lock_owner(rbd_dev));
__rbd_dev_header_unwatch_sync(rbd_dev);
rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
ret = rbd_dev_header_watch_sync(rbd_dev);
if (ret) {
rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
return;
format_lock_cookie(rbd_dev, cookie);
ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
RBD_LOCK_NAME, cookie);
if (ret && ret != -ENOENT) {
rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
return ret;
}
ret = rbd_dev_refresh(rbd_dev);
if (ret)
rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
return 0;
}
/*
* Initiate a watch request, synchronously.
*/
static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
enum rbd_notify_op notify_op,
struct page ***preply_pages,
size_t *preply_len)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct ceph_osd_linger_request *handle;
struct rbd_client_id cid = rbd_get_cid(rbd_dev);
int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
char buf[buf_size];
void *p = buf;
rbd_assert(!rbd_dev->watch_handle);
dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, rbd_watch_cb,
rbd_watch_errcb, rbd_dev);
if (IS_ERR(handle))
return PTR_ERR(handle);
/* encode *LockPayload NotifyMessage (op + ClientId) */
ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
ceph_encode_32(&p, notify_op);
ceph_encode_64(&p, cid.gid);
ceph_encode_64(&p, cid.handle);
rbd_dev->watch_handle = handle;
return 0;
return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, buf, buf_size,
RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
}
static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
enum rbd_notify_op notify_op)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
int ret;
struct page **reply_pages;
size_t reply_len;
if (!rbd_dev->watch_handle)
return;
__rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
}
ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
if (ret)
rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
static void rbd_notify_acquired_lock(struct work_struct *work)
{
struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
acquired_lock_work);
rbd_dev->watch_handle = NULL;
rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
}
/*
* Tear down a watch request, synchronously.
*/
static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
static void rbd_notify_released_lock(struct work_struct *work)
{
__rbd_dev_header_unwatch_sync(rbd_dev);
struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
released_lock_work);
dout("%s flushing notifies\n", __func__);
ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
}
/*
* Synchronous osd object method call. Returns the number of bytes
* returned in the outbound buffer, or a negative error code.
*/
static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
const char *object_name,
const char *class_name,
const char *method_name,
const void *outbound,
size_t outbound_size,
void *inbound,
size_t inbound_size)
static int rbd_request_lock(struct rbd_device *rbd_dev)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request;
struct page **pages;
u32 page_count;
struct page **reply_pages;
size_t reply_len;
bool lock_owner_responded = false;
int ret;
/*
* Method calls are ultimately read operations. The result
* should placed into the inbound buffer provided. They
* also supply outbound data--parameters for the object
* method. Currently if this is present it will be a
* snapshot id.
*/
page_count = (u32)calc_pages_for(0, inbound_size);
pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
if (IS_ERR(pages))
return PTR_ERR(pages);
dout("%s rbd_dev %p\n", __func__, rbd_dev);
ret = -ENOMEM;
obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
OBJ_REQUEST_PAGES);
if (!obj_request)
ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
&reply_pages, &reply_len);
if (ret && ret != -ETIMEDOUT) {
rbd_warn(rbd_dev, "failed to request lock: %d", ret);
goto out;
}
obj_request->pages = pages;
obj_request->page_count = page_count;
if (reply_len > 0 && reply_len <= PAGE_SIZE) {
void *p = page_address(reply_pages[0]);
void *const end = p + reply_len;
u32 n;
obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
obj_request);
if (!obj_request->osd_req)
goto out;
ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
while (n--) {
u8 struct_v;
u32 len;
osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
class_name, method_name);
if (outbound_size) {
struct ceph_pagelist *pagelist;
ceph_decode_need(&p, end, 8 + 8, e_inval);
p += 8 + 8; /* skip gid and cookie */
pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
ceph_decode_32_safe(&p, end, len, e_inval);
if (!len)
continue;
if (lock_owner_responded) {
rbd_warn(rbd_dev,
"duplicate lock owners detected");
ret = -EIO;
goto out;
}
lock_owner_responded = true;
ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
&struct_v, &len);
if (ret) {
rbd_warn(rbd_dev,
"failed to decode ResponseMessage: %d",
ret);
goto e_inval;
}
ret = ceph_decode_32(&p);
}
}
if (!lock_owner_responded) {
rbd_warn(rbd_dev, "no lock owners detected");
ret = -ETIMEDOUT;
}
out:
ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
return ret;
e_inval:
ret = -EINVAL;
goto out;
}
static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
{
dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
cancel_delayed_work(&rbd_dev->lock_dwork);
if (wake_all)
wake_up_all(&rbd_dev->lock_waitq);
else
wake_up(&rbd_dev->lock_waitq);
}
static int get_lock_owner_info(struct rbd_device *rbd_dev,
struct ceph_locker **lockers, u32 *num_lockers)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
u8 lock_type;
char *lock_tag;
int ret;
dout("%s rbd_dev %p\n", __func__, rbd_dev);
ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, RBD_LOCK_NAME,
&lock_type, &lock_tag, lockers, num_lockers);
if (ret)
return ret;
if (*num_lockers == 0) {
dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
goto out;
}
if (strcmp(lock_tag, RBD_LOCK_TAG)) {
rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
lock_tag);
ret = -EBUSY;
goto out;
}
if (lock_type == CEPH_CLS_LOCK_SHARED) {
rbd_warn(rbd_dev, "shared lock type detected");
ret = -EBUSY;
goto out;
}
if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
strlen(RBD_LOCK_COOKIE_PREFIX))) {
rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
(*lockers)[0].id.cookie);
ret = -EBUSY;
goto out;
}
out:
kfree(lock_tag);
return ret;
}
static int find_watcher(struct rbd_device *rbd_dev,
const struct ceph_locker *locker)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct ceph_watch_item *watchers;
u32 num_watchers;
u64 cookie;
int i;
int ret;
ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, &watchers,
&num_watchers);
if (ret)
return ret;
sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
for (i = 0; i < num_watchers; i++) {
if (!memcmp(&watchers[i].addr, &locker->info.addr,
sizeof(locker->info.addr)) &&
watchers[i].cookie == cookie) {
struct rbd_client_id cid = {
.gid = le64_to_cpu(watchers[i].name.num),
.handle = cookie,
};
dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
rbd_dev, cid.gid, cid.handle);
rbd_set_owner_cid(rbd_dev, &cid);
ret = 1;
goto out;
}
}
dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
ret = 0;
out:
kfree(watchers);
return ret;
}
/*
* lock_rwsem must be held for write
*/
static int rbd_try_lock(struct rbd_device *rbd_dev)
{
struct ceph_client *client = rbd_dev->rbd_client->client;
struct ceph_locker *lockers;
u32 num_lockers;
int ret;
for (;;) {
ret = rbd_lock(rbd_dev);
if (ret != -EBUSY)
return ret;
/* determine if the current lock holder is still alive */
ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
if (ret)
return ret;
if (num_lockers == 0)
goto again;
ret = find_watcher(rbd_dev, lockers);
if (ret) {
if (ret > 0)
ret = 0; /* have to request lock */
goto out;
}
rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
ENTITY_NAME(lockers[0].id.name));
ret = ceph_monc_blacklist_add(&client->monc,
&lockers[0].info.addr);
if (ret) {
rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
ENTITY_NAME(lockers[0].id.name), ret);
goto out;
}
ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, RBD_LOCK_NAME,
lockers[0].id.cookie,
&lockers[0].id.name);
if (ret && ret != -ENOENT)
goto out;
again:
ceph_free_lockers(lockers, num_lockers);
}
out:
ceph_free_lockers(lockers, num_lockers);
return ret;
}
/*
* ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
*/
static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
int *pret)
{
enum rbd_lock_state lock_state;
down_read(&rbd_dev->lock_rwsem);
dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
rbd_dev->lock_state);
if (__rbd_is_lock_owner(rbd_dev)) {
lock_state = rbd_dev->lock_state;
up_read(&rbd_dev->lock_rwsem);
return lock_state;
}
up_read(&rbd_dev->lock_rwsem);
down_write(&rbd_dev->lock_rwsem);
dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
rbd_dev->lock_state);
if (!__rbd_is_lock_owner(rbd_dev)) {
*pret = rbd_try_lock(rbd_dev);
if (*pret)
rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
}
lock_state = rbd_dev->lock_state;
up_write(&rbd_dev->lock_rwsem);
return lock_state;
}
static void rbd_acquire_lock(struct work_struct *work)
{
struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
struct rbd_device, lock_dwork);
enum rbd_lock_state lock_state;
int ret;
dout("%s rbd_dev %p\n", __func__, rbd_dev);
again:
lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
if (lock_state == RBD_LOCK_STATE_LOCKED)
wake_requests(rbd_dev, true);
dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
rbd_dev, lock_state, ret);
return;
}
ret = rbd_request_lock(rbd_dev);
if (ret == -ETIMEDOUT) {
goto again; /* treat this as a dead client */
} else if (ret < 0) {
rbd_warn(rbd_dev, "error requesting lock: %d", ret);
mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
RBD_RETRY_DELAY);
} else {
/*
* lock owner acked, but resend if we don't see them
* release the lock
*/
dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
rbd_dev);
mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
}
}
/*
* lock_rwsem must be held for write
*/
static bool rbd_release_lock(struct rbd_device *rbd_dev)
{
dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
rbd_dev->lock_state);
if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
return false;
rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
downgrade_write(&rbd_dev->lock_rwsem);
/*
* Ensure that all in-flight IO is flushed.
*
* FIXME: ceph_osdc_sync() flushes the entire OSD client, which
* may be shared with other devices.
*/
ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
up_read(&rbd_dev->lock_rwsem);
down_write(&rbd_dev->lock_rwsem);
dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
rbd_dev->lock_state);
if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
return false;
if (!rbd_unlock(rbd_dev))
/*
* Give others a chance to grab the lock - we would re-acquire
* almost immediately if we got new IO during ceph_osdc_sync()
* otherwise. We need to ack our own notifications, so this
* lock_dwork will be requeued from rbd_wait_state_locked()
* after wake_requests() in rbd_handle_released_lock().
*/
cancel_delayed_work(&rbd_dev->lock_dwork);
return true;
}
static void rbd_release_lock_work(struct work_struct *work)
{
struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
unlock_work);
down_write(&rbd_dev->lock_rwsem);
rbd_release_lock(rbd_dev);
up_write(&rbd_dev->lock_rwsem);
}
static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
void **p)
{
struct rbd_client_id cid = { 0 };
if (struct_v >= 2) {
cid.gid = ceph_decode_64(p);
cid.handle = ceph_decode_64(p);
}
dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
cid.handle);
if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
down_write(&rbd_dev->lock_rwsem);
if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
/*
* we already know that the remote client is
* the owner
*/
up_write(&rbd_dev->lock_rwsem);
return;
}
rbd_set_owner_cid(rbd_dev, &cid);
downgrade_write(&rbd_dev->lock_rwsem);
} else {
down_read(&rbd_dev->lock_rwsem);
}
if (!__rbd_is_lock_owner(rbd_dev))
wake_requests(rbd_dev, false);
up_read(&rbd_dev->lock_rwsem);
}
static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
void **p)
{
struct rbd_client_id cid = { 0 };
if (struct_v >= 2) {
cid.gid = ceph_decode_64(p);
cid.handle = ceph_decode_64(p);
}
dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
cid.handle);
if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
down_write(&rbd_dev->lock_rwsem);
if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
__func__, rbd_dev, cid.gid, cid.handle,
rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
up_write(&rbd_dev->lock_rwsem);
return;
}
rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
downgrade_write(&rbd_dev->lock_rwsem);
} else {
down_read(&rbd_dev->lock_rwsem);
}
if (!__rbd_is_lock_owner(rbd_dev))
wake_requests(rbd_dev, false);
up_read(&rbd_dev->lock_rwsem);
}
static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
void **p)
{
struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
struct rbd_client_id cid = { 0 };
bool need_to_send;
if (struct_v >= 2) {
cid.gid = ceph_decode_64(p);
cid.handle = ceph_decode_64(p);
}
dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
cid.handle);
if (rbd_cid_equal(&cid, &my_cid))
return false;
down_read(&rbd_dev->lock_rwsem);
need_to_send = __rbd_is_lock_owner(rbd_dev);
if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
dout("%s rbd_dev %p queueing unlock_work\n", __func__,
rbd_dev);
queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
}
}
up_read(&rbd_dev->lock_rwsem);
return need_to_send;
}
static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
u64 notify_id, u64 cookie, s32 *result)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
char buf[buf_size];
int ret;
if (result) {
void *p = buf;
/* encode ResponseMessage */
ceph_start_encoding(&p, 1, 1,
buf_size - CEPH_ENCODING_START_BLK_LEN);
ceph_encode_32(&p, *result);
} else {
buf_size = 0;
}
ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, notify_id, cookie,
buf, buf_size);
if (ret)
rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
}
static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
u64 cookie)
{
dout("%s rbd_dev %p\n", __func__, rbd_dev);
__rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
}
static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
u64 notify_id, u64 cookie, s32 result)
{
dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
__rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
}
static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
u64 notifier_id, void *data, size_t data_len)
{
struct rbd_device *rbd_dev = arg;
void *p = data;
void *const end = p + data_len;
u8 struct_v;
u32 len;
u32 notify_op;
int ret;
dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
__func__, rbd_dev, cookie, notify_id, data_len);
if (data_len) {
ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
&struct_v, &len);
if (ret) {
rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
ret);
return;
}
notify_op = ceph_decode_32(&p);
} else {
/* legacy notification for header updates */
notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
len = 0;
}
dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
switch (notify_op) {
case RBD_NOTIFY_OP_ACQUIRED_LOCK:
rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
break;
case RBD_NOTIFY_OP_RELEASED_LOCK:
rbd_handle_released_lock(rbd_dev, struct_v, &p);
rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
break;
case RBD_NOTIFY_OP_REQUEST_LOCK:
if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
/*
* send ResponseMessage(0) back so the client
* can detect a missing owner
*/
rbd_acknowledge_notify_result(rbd_dev, notify_id,
cookie, 0);
else
rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
break;
case RBD_NOTIFY_OP_HEADER_UPDATE:
ret = rbd_dev_refresh(rbd_dev);
if (ret)
rbd_warn(rbd_dev, "refresh failed: %d", ret);
rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
break;
default:
if (rbd_is_lock_owner(rbd_dev))
rbd_acknowledge_notify_result(rbd_dev, notify_id,
cookie, -EOPNOTSUPP);
else
rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
break;
}
}
static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
static void rbd_watch_errcb(void *arg, u64 cookie, int err)
{
struct rbd_device *rbd_dev = arg;
rbd_warn(rbd_dev, "encountered watch error: %d", err);
down_write(&rbd_dev->lock_rwsem);
rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
up_write(&rbd_dev->lock_rwsem);
mutex_lock(&rbd_dev->watch_mutex);
if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
__rbd_unregister_watch(rbd_dev);
rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
}
mutex_unlock(&rbd_dev->watch_mutex);
}
/*
* watch_mutex must be locked
*/
static int __rbd_register_watch(struct rbd_device *rbd_dev)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct ceph_osd_linger_request *handle;
rbd_assert(!rbd_dev->watch_handle);
dout("%s rbd_dev %p\n", __func__, rbd_dev);
handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, rbd_watch_cb,
rbd_watch_errcb, rbd_dev);
if (IS_ERR(handle))
return PTR_ERR(handle);
rbd_dev->watch_handle = handle;
return 0;
}
/*
* watch_mutex must be locked
*/
static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
int ret;
rbd_assert(rbd_dev->watch_handle);
dout("%s rbd_dev %p\n", __func__, rbd_dev);
ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
if (ret)
rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
rbd_dev->watch_handle = NULL;
}
static int rbd_register_watch(struct rbd_device *rbd_dev)
{
int ret;
mutex_lock(&rbd_dev->watch_mutex);
rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
ret = __rbd_register_watch(rbd_dev);
if (ret)
goto out;
rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
out:
mutex_unlock(&rbd_dev->watch_mutex);
return ret;
}
static void cancel_tasks_sync(struct rbd_device *rbd_dev)
{
dout("%s rbd_dev %p\n", __func__, rbd_dev);
cancel_delayed_work_sync(&rbd_dev->watch_dwork);
cancel_work_sync(&rbd_dev->acquired_lock_work);
cancel_work_sync(&rbd_dev->released_lock_work);
cancel_delayed_work_sync(&rbd_dev->lock_dwork);
cancel_work_sync(&rbd_dev->unlock_work);
}
static void rbd_unregister_watch(struct rbd_device *rbd_dev)
{
WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
cancel_tasks_sync(rbd_dev);
mutex_lock(&rbd_dev->watch_mutex);
if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
__rbd_unregister_watch(rbd_dev);
rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
mutex_unlock(&rbd_dev->watch_mutex);
ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
}
static void rbd_reregister_watch(struct work_struct *work)
{
struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
struct rbd_device, watch_dwork);
bool was_lock_owner = false;
int ret;
dout("%s rbd_dev %p\n", __func__, rbd_dev);
down_write(&rbd_dev->lock_rwsem);
if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
was_lock_owner = rbd_release_lock(rbd_dev);
mutex_lock(&rbd_dev->watch_mutex);
if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR)
goto fail_unlock;
ret = __rbd_register_watch(rbd_dev);
if (ret) {
rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
if (ret != -EBLACKLISTED)
queue_delayed_work(rbd_dev->task_wq,
&rbd_dev->watch_dwork,
RBD_RETRY_DELAY);
goto fail_unlock;
}
rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
mutex_unlock(&rbd_dev->watch_mutex);
ret = rbd_dev_refresh(rbd_dev);
if (ret)
rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
if (was_lock_owner) {
ret = rbd_try_lock(rbd_dev);
if (ret)
rbd_warn(rbd_dev, "reregisteration lock failed: %d",
ret);
}
up_write(&rbd_dev->lock_rwsem);
wake_requests(rbd_dev, true);
return;
fail_unlock:
mutex_unlock(&rbd_dev->watch_mutex);
up_write(&rbd_dev->lock_rwsem);
}
/*
* Synchronous osd object method call. Returns the number of bytes
* returned in the outbound buffer, or a negative error code.
*/
static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
const char *object_name,
const char *class_name,
const char *method_name,
const void *outbound,
size_t outbound_size,
void *inbound,
size_t inbound_size)
{
struct rbd_obj_request *obj_request;
struct page **pages;
u32 page_count;
int ret;
/*
* Method calls are ultimately read operations. The result
* should placed into the inbound buffer provided. They
* also supply outbound data--parameters for the object
* method. Currently if this is present it will be a
* snapshot id.
*/
page_count = (u32)calc_pages_for(0, inbound_size);
pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
if (IS_ERR(pages))
return PTR_ERR(pages);
ret = -ENOMEM;
obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
OBJ_REQUEST_PAGES);
if (!obj_request)
goto out;
obj_request->pages = pages;
obj_request->page_count = page_count;
obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
obj_request);
if (!obj_request->osd_req)
goto out;
osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
class_name, method_name);
if (outbound_size) {
struct ceph_pagelist *pagelist;
pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
if (!pagelist)
goto out;
......@@ -3242,11 +4033,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
obj_request->pages, inbound_size,
0, false, false);
rbd_osd_req_format_read(obj_request);
ret = rbd_obj_request_submit(osdc, obj_request);
if (ret)
goto out;
rbd_obj_request_submit(obj_request);
ret = rbd_obj_request_wait(obj_request);
if (ret)
goto out;
......@@ -3267,6 +4055,29 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
return ret;
}
/*
* lock_rwsem must be held for read
*/
static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
{
DEFINE_WAIT(wait);
do {
/*
* Note the use of mod_delayed_work() in rbd_acquire_lock()
* and cancel_delayed_work() in wake_requests().
*/
dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
TASK_UNINTERRUPTIBLE);
up_read(&rbd_dev->lock_rwsem);
schedule();
down_read(&rbd_dev->lock_rwsem);
} while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
finish_wait(&rbd_dev->lock_waitq, &wait);
}
static void rbd_queue_workfn(struct work_struct *work)
{
struct request *rq = blk_mq_rq_from_pdu(work);
......@@ -3277,6 +4088,7 @@ static void rbd_queue_workfn(struct work_struct *work)
u64 length = blk_rq_bytes(rq);
enum obj_operation_type op_type;
u64 mapping_size;
bool must_be_locked;
int result;
if (rq->cmd_type != REQ_TYPE_FS) {
......@@ -3338,6 +4150,10 @@ static void rbd_queue_workfn(struct work_struct *work)
if (op_type != OBJ_OP_READ) {
snapc = rbd_dev->header.snapc;
ceph_get_snap_context(snapc);
must_be_locked = rbd_is_lock_supported(rbd_dev);
} else {
must_be_locked = rbd_dev->opts->lock_on_read &&
rbd_is_lock_supported(rbd_dev);
}
up_read(&rbd_dev->header_rwsem);
......@@ -3348,11 +4164,17 @@ static void rbd_queue_workfn(struct work_struct *work)
goto err_rq;
}
if (must_be_locked) {
down_read(&rbd_dev->lock_rwsem);
if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
rbd_wait_state_locked(rbd_dev);
}
img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
snapc);
if (!img_request) {
result = -ENOMEM;
goto err_rq;
goto err_unlock;
}
img_request->rq = rq;
snapc = NULL; /* img_request consumes a ref */
......@@ -3370,10 +4192,15 @@ static void rbd_queue_workfn(struct work_struct *work)
if (result)
goto err_img_request;
if (must_be_locked)
up_read(&rbd_dev->lock_rwsem);
return;
err_img_request:
rbd_img_request_put(img_request);
err_unlock:
if (must_be_locked)
up_read(&rbd_dev->lock_rwsem);
err_rq:
if (result)
rbd_warn(rbd_dev, "%s %llx at %llx result %d",
......@@ -3415,7 +4242,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
u64 offset, u64 length, void *buf)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request;
struct page **pages = NULL;
u32 page_count;
......@@ -3448,11 +4274,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
obj_request->length,
obj_request->offset & ~PAGE_MASK,
false, false);
rbd_osd_req_format_read(obj_request);
ret = rbd_obj_request_submit(osdc, obj_request);
if (ret)
goto out;
rbd_obj_request_submit(obj_request);
ret = rbd_obj_request_wait(obj_request);
if (ret)
goto out;
......@@ -3751,13 +4574,40 @@ static ssize_t rbd_minor_show(struct device *dev,
return sprintf(buf, "%d\n", rbd_dev->minor);
}
static ssize_t rbd_client_addr_show(struct device *dev,
struct device_attribute *attr, char *buf)
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
struct ceph_entity_addr *client_addr =
ceph_client_addr(rbd_dev->rbd_client->client);
return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
le32_to_cpu(client_addr->nonce));
}
static ssize_t rbd_client_id_show(struct device *dev,
struct device_attribute *attr, char *buf)
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
return sprintf(buf, "client%lld\n",
ceph_client_id(rbd_dev->rbd_client->client));
ceph_client_gid(rbd_dev->rbd_client->client));
}
static ssize_t rbd_cluster_fsid_show(struct device *dev,
struct device_attribute *attr, char *buf)
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
}
static ssize_t rbd_config_info_show(struct device *dev,
struct device_attribute *attr, char *buf)
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
return sprintf(buf, "%s\n", rbd_dev->config_info);
}
static ssize_t rbd_pool_show(struct device *dev,
......@@ -3809,6 +4659,14 @@ static ssize_t rbd_snap_show(struct device *dev,
return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
}
static ssize_t rbd_snap_id_show(struct device *dev,
struct device_attribute *attr, char *buf)
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
}
/*
* For a v2 image, shows the chain of parent images, separated by empty
* lines. For v1 images or if there is no parent, shows "(no parent
......@@ -3861,13 +4719,17 @@ static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
static struct attribute *rbd_attrs[] = {
......@@ -3875,12 +4737,16 @@ static struct attribute *rbd_attrs[] = {
&dev_attr_features.attr,
&dev_attr_major.attr,
&dev_attr_minor.attr,
&dev_attr_client_addr.attr,
&dev_attr_client_id.attr,
&dev_attr_cluster_fsid.attr,
&dev_attr_config_info.attr,
&dev_attr_pool.attr,
&dev_attr_pool_id.attr,
&dev_attr_name.attr,
&dev_attr_image_id.attr,
&dev_attr_current_snap.attr,
&dev_attr_snap_id.attr,
&dev_attr_parent.attr,
&dev_attr_refresh.attr,
NULL
......@@ -3943,18 +4809,32 @@ static void rbd_spec_free(struct kref *kref)
kfree(spec);
}
static void rbd_dev_release(struct device *dev)
static void rbd_dev_free(struct rbd_device *rbd_dev)
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
bool need_put = !!rbd_dev->opts;
WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
ceph_oid_destroy(&rbd_dev->header_oid);
ceph_oloc_destroy(&rbd_dev->header_oloc);
kfree(rbd_dev->config_info);
rbd_put_client(rbd_dev->rbd_client);
rbd_spec_put(rbd_dev->spec);
kfree(rbd_dev->opts);
kfree(rbd_dev);
}
static void rbd_dev_release(struct device *dev)
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
bool need_put = !!rbd_dev->opts;
if (need_put) {
destroy_workqueue(rbd_dev->task_wq);
ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
}
rbd_dev_free(rbd_dev);
/*
* This is racy, but way better than putting module outside of
......@@ -3965,25 +4845,34 @@ static void rbd_dev_release(struct device *dev)
module_put(THIS_MODULE);
}
static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
struct rbd_spec *spec,
struct rbd_options *opts)
static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
struct rbd_spec *spec)
{
struct rbd_device *rbd_dev;
rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
if (!rbd_dev)
return NULL;
spin_lock_init(&rbd_dev->lock);
rbd_dev->flags = 0;
atomic_set(&rbd_dev->parent_ref, 0);
INIT_LIST_HEAD(&rbd_dev->node);
init_rwsem(&rbd_dev->header_rwsem);
ceph_oid_init(&rbd_dev->header_oid);
ceph_oloc_init(&rbd_dev->header_oloc);
mutex_init(&rbd_dev->watch_mutex);
rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
init_rwsem(&rbd_dev->lock_rwsem);
rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
init_waitqueue_head(&rbd_dev->lock_waitq);
rbd_dev->dev.bus = &rbd_bus_type;
rbd_dev->dev.type = &rbd_device_type;
rbd_dev->dev.parent = &rbd_root_dev;
......@@ -3991,9 +4880,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
rbd_dev->rbd_client = rbdc;
rbd_dev->spec = spec;
rbd_dev->opts = opts;
/* Initialize the layout used for all rbd requests */
rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
rbd_dev->layout.stripe_count = 1;
......@@ -4001,15 +4887,48 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
rbd_dev->layout.pool_id = spec->pool_id;
RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
/*
* If this is a mapping rbd_dev (as opposed to a parent one),
* pin our module. We have a ref from do_rbd_add(), so use
* __module_get().
*/
if (rbd_dev->opts)
__module_get(THIS_MODULE);
return rbd_dev;
}
/*
* Create a mapping rbd_dev.
*/
static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
struct rbd_spec *spec,
struct rbd_options *opts)
{
struct rbd_device *rbd_dev;
rbd_dev = __rbd_dev_create(rbdc, spec);
if (!rbd_dev)
return NULL;
rbd_dev->opts = opts;
/* get an id and fill in device name */
rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
minor_to_rbd_dev_id(1 << MINORBITS),
GFP_KERNEL);
if (rbd_dev->dev_id < 0)
goto fail_rbd_dev;
sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
rbd_dev->name);
if (!rbd_dev->task_wq)
goto fail_dev_id;
/* we have a ref from do_rbd_add() */
__module_get(THIS_MODULE);
dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
return rbd_dev;
fail_dev_id:
ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
fail_rbd_dev:
rbd_dev_free(rbd_dev);
return NULL;
}
static void rbd_dev_destroy(struct rbd_device *rbd_dev)
......@@ -4644,46 +5563,6 @@ static int rbd_dev_header_info(struct rbd_device *rbd_dev)
return rbd_dev_v2_header_info(rbd_dev);
}
/*
* Get a unique rbd identifier for the given new rbd_dev, and add
* the rbd_dev to the global list.
*/
static int rbd_dev_id_get(struct rbd_device *rbd_dev)
{
int new_dev_id;
new_dev_id = ida_simple_get(&rbd_dev_id_ida,
0, minor_to_rbd_dev_id(1 << MINORBITS),
GFP_KERNEL);
if (new_dev_id < 0)
return new_dev_id;
rbd_dev->dev_id = new_dev_id;
spin_lock(&rbd_dev_list_lock);
list_add_tail(&rbd_dev->node, &rbd_dev_list);
spin_unlock(&rbd_dev_list_lock);
dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
return 0;
}
/*
* Remove an rbd_dev from the global list, and record that its
* identifier is no longer in use.
*/
static void rbd_dev_id_put(struct rbd_device *rbd_dev)
{
spin_lock(&rbd_dev_list_lock);
list_del_init(&rbd_dev->node);
spin_unlock(&rbd_dev_list_lock);
ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
}
/*
* Skips over white space at *buf, and updates *buf to point to the
* first found non-space character (if any). Returns the length of
......@@ -4859,6 +5738,7 @@ static int rbd_add_parse_args(const char *buf,
rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
copts = ceph_parse_options(options, mon_addrs,
mon_addrs + mon_addrs_size - 1,
......@@ -5076,8 +5956,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
goto out_err;
}
parent = rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec,
NULL);
parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
if (!parent) {
ret = -ENOMEM;
goto out_err;
......@@ -5112,22 +5991,12 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
{
int ret;
/* Get an id and fill in device name. */
ret = rbd_dev_id_get(rbd_dev);
if (ret)
goto err_out_unlock;
BUILD_BUG_ON(DEV_NAME_LEN
< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
/* Record our major and minor device numbers. */
if (!single_major) {
ret = register_blkdev(0, rbd_dev->name);
if (ret < 0)
goto err_out_id;
goto err_out_unlock;
rbd_dev->major = ret;
rbd_dev->minor = 0;
......@@ -5159,9 +6028,14 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
up_write(&rbd_dev->header_rwsem);
spin_lock(&rbd_dev_list_lock);
list_add_tail(&rbd_dev->node, &rbd_dev_list);
spin_unlock(&rbd_dev_list_lock);
add_disk(rbd_dev->disk);
pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
(unsigned long long) rbd_dev->mapping.size);
pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
(unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
rbd_dev->header.features);
return ret;
......@@ -5172,8 +6046,6 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
err_out_blkdev:
if (!single_major)
unregister_blkdev(rbd_dev->major, rbd_dev->name);
err_out_id:
rbd_dev_id_put(rbd_dev);
err_out_unlock:
up_write(&rbd_dev->header_rwsem);
return ret;
......@@ -5234,7 +6106,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
goto err_out_format;
if (!depth) {
ret = rbd_dev_header_watch_sync(rbd_dev);
ret = rbd_register_watch(rbd_dev);
if (ret) {
if (ret == -ENOENT)
pr_info("image %s/%s does not exist\n",
......@@ -5293,7 +6165,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
rbd_dev_unprobe(rbd_dev);
err_out_watch:
if (!depth)
rbd_dev_header_unwatch_sync(rbd_dev);
rbd_unregister_watch(rbd_dev);
err_out_format:
rbd_dev->image_format = 0;
kfree(rbd_dev->spec->image_id);
......@@ -5345,10 +6217,18 @@ static ssize_t do_rbd_add(struct bus_type *bus,
spec = NULL; /* rbd_dev now owns this */
rbd_opts = NULL; /* rbd_dev now owns this */
rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
if (!rbd_dev->config_info) {
rc = -ENOMEM;
goto err_out_rbd_dev;
}
down_write(&rbd_dev->header_rwsem);
rc = rbd_dev_image_probe(rbd_dev, 0);
if (rc < 0)
if (rc < 0) {
up_write(&rbd_dev->header_rwsem);
goto err_out_rbd_dev;
}
/* If we are mapping a snapshot it must be marked read-only */
......@@ -5360,11 +6240,11 @@ static ssize_t do_rbd_add(struct bus_type *bus,
rc = rbd_dev_device_setup(rbd_dev);
if (rc) {
/*
* rbd_dev_header_unwatch_sync() can't be moved into
* rbd_unregister_watch() can't be moved into
* rbd_dev_image_release() without refactoring, see
* commit 1f3ef78861ac.
*/
rbd_dev_header_unwatch_sync(rbd_dev);
rbd_unregister_watch(rbd_dev);
rbd_dev_image_release(rbd_dev);
goto out;
}
......@@ -5375,7 +6255,6 @@ static ssize_t do_rbd_add(struct bus_type *bus,
return rc;
err_out_rbd_dev:
up_write(&rbd_dev->header_rwsem);
rbd_dev_destroy(rbd_dev);
err_out_client:
rbd_put_client(rbdc);
......@@ -5405,12 +6284,16 @@ static ssize_t rbd_add_single_major(struct bus_type *bus,
static void rbd_dev_device_release(struct rbd_device *rbd_dev)
{
rbd_free_disk(rbd_dev);
spin_lock(&rbd_dev_list_lock);
list_del_init(&rbd_dev->node);
spin_unlock(&rbd_dev_list_lock);
clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
device_del(&rbd_dev->dev);
rbd_dev_mapping_clear(rbd_dev);
if (!single_major)
unregister_blkdev(rbd_dev->major, rbd_dev->name);
rbd_dev_id_put(rbd_dev);
}
static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
......@@ -5446,18 +6329,26 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
struct rbd_device *rbd_dev = NULL;
struct list_head *tmp;
int dev_id;
unsigned long ul;
char opt_buf[6];
bool already = false;
bool force = false;
int ret;
ret = kstrtoul(buf, 10, &ul);
if (ret)
return ret;
/* convert to int; abort if we lost anything in the conversion */
dev_id = (int)ul;
if (dev_id != ul)
dev_id = -1;
opt_buf[0] = '\0';
sscanf(buf, "%d %5s", &dev_id, opt_buf);
if (dev_id < 0) {
pr_err("dev_id out of range\n");
return -EINVAL;
}
if (opt_buf[0] != '\0') {
if (!strcmp(opt_buf, "force")) {
force = true;
} else {
pr_err("bad remove option at '%s'\n", opt_buf);
return -EINVAL;
}
}
ret = -ENOENT;
spin_lock(&rbd_dev_list_lock);
......@@ -5470,7 +6361,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
}
if (!ret) {
spin_lock_irq(&rbd_dev->lock);
if (rbd_dev->open_count)
if (rbd_dev->open_count && !force)
ret = -EBUSY;
else
already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
......@@ -5481,7 +6372,20 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
if (ret < 0 || already)
return ret;
rbd_dev_header_unwatch_sync(rbd_dev);
if (force) {
/*
* Prevent new IO from being queued and wait for existing
* IO to complete/fail.
*/
blk_mq_freeze_queue(rbd_dev->disk->queue);
blk_set_queue_dying(rbd_dev->disk->queue);
}
down_write(&rbd_dev->lock_rwsem);
if (__rbd_is_lock_owner(rbd_dev))
rbd_unlock(rbd_dev);
up_write(&rbd_dev->lock_rwsem);
rbd_unregister_watch(rbd_dev);
/*
* Don't free anything from rbd_dev->disk until after all
......
......@@ -28,6 +28,17 @@
#define RBD_DATA_PREFIX "rbd_data."
#define RBD_ID_PREFIX "rbd_id."
#define RBD_LOCK_NAME "rbd_lock"
#define RBD_LOCK_TAG "internal"
#define RBD_LOCK_COOKIE_PREFIX "auto"
enum rbd_notify_op {
RBD_NOTIFY_OP_ACQUIRED_LOCK = 0,
RBD_NOTIFY_OP_RELEASED_LOCK = 1,
RBD_NOTIFY_OP_REQUEST_LOCK = 2,
RBD_NOTIFY_OP_HEADER_UPDATE = 3,
};
/*
* For format version 1, rbd image 'foo' consists of objects
* foo.rbd - image metadata
......
......@@ -175,9 +175,8 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
static int ceph_releasepage(struct page *page, gfp_t g)
{
dout("%p releasepage %p idx %lu\n", page->mapping->host,
page, page->index);
WARN_ON(PageDirty(page));
dout("%p releasepage %p idx %lu (%sdirty)\n", page->mapping->host,
page, page->index, PageDirty(page) ? "" : "not ");
/* Can we release the page from the cache? */
if (!ceph_release_fscache_page(page, g))
......@@ -298,14 +297,6 @@ static void finish_read(struct ceph_osd_request *req)
kfree(osd_data->pages);
}
static void ceph_unlock_page_vector(struct page **pages, int num_pages)
{
int i;
for (i = 0; i < num_pages; i++)
unlock_page(pages[i]);
}
/*
* start an async read(ahead) operation. return nr_pages we submitted
* a read for on success, or negative error code.
......@@ -370,6 +361,10 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
dout("start_read %p add_to_page_cache failed %p\n",
inode, page);
nr_pages = i;
if (nr_pages > 0) {
len = nr_pages << PAGE_SHIFT;
break;
}
goto out_pages;
}
pages[i] = page;
......@@ -386,8 +381,11 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
return nr_pages;
out_pages:
ceph_unlock_page_vector(pages, nr_pages);
ceph_release_page_vector(pages, nr_pages);
for (i = 0; i < nr_pages; ++i) {
ceph_fscache_readpage_cancel(inode, pages[i]);
unlock_page(pages[i]);
}
ceph_put_page_vector(pages, nr_pages, false);
out:
ceph_osdc_put_request(req);
return ret;
......
......@@ -902,10 +902,10 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
return ret;
if (write) {
ret = invalidate_inode_pages2_range(inode->i_mapping,
int ret2 = invalidate_inode_pages2_range(inode->i_mapping,
pos >> PAGE_SHIFT,
(pos + count) >> PAGE_SHIFT);
if (ret < 0)
if (ret2 < 0)
dout("invalidate_inode_pages2_range returned %d\n", ret);
flags = CEPH_OSD_FLAG_ORDERSNAP |
......
......@@ -210,8 +210,8 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
if (!(fl->fl_flags & FL_FLOCK))
return -ENOLCK;
/* No mandatory locks */
if (__mandatory_lock(file->f_mapping->host) && fl->fl_type != F_UNLCK)
return -ENOLCK;
if (fl->fl_type & LOCK_MAND)
return -EOPNOTSUPP;
dout("ceph_flock, fl_file: %p", fl->fl_file);
......
......@@ -370,6 +370,7 @@ const char *ceph_session_state_name(int s)
case CEPH_MDS_SESSION_CLOSING: return "closing";
case CEPH_MDS_SESSION_RESTARTING: return "restarting";
case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
case CEPH_MDS_SESSION_REJECTED: return "rejected";
default: return "???";
}
}
......@@ -1150,8 +1151,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
while (!list_empty(&ci->i_cap_flush_list)) {
cf = list_first_entry(&ci->i_cap_flush_list,
struct ceph_cap_flush, i_list);
list_del(&cf->i_list);
list_add(&cf->i_list, &to_remove);
list_move(&cf->i_list, &to_remove);
}
spin_lock(&mdsc->cap_dirty_lock);
......@@ -1378,7 +1378,7 @@ static int request_close_session(struct ceph_mds_client *mdsc,
if (!msg)
return -ENOMEM;
ceph_con_send(&session->s_con, msg);
return 0;
return 1;
}
/*
......@@ -2131,6 +2131,10 @@ static int __do_request(struct ceph_mds_client *mdsc,
ceph_session_state_name(session->s_state));
if (session->s_state != CEPH_MDS_SESSION_OPEN &&
session->s_state != CEPH_MDS_SESSION_HUNG) {
if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
err = -EACCES;
goto out_session;
}
if (session->s_state == CEPH_MDS_SESSION_NEW ||
session->s_state == CEPH_MDS_SESSION_CLOSING)
__open_session(mdsc, session);
......@@ -2652,6 +2656,15 @@ static void handle_session(struct ceph_mds_session *session,
wake_up_session_caps(session, 0);
break;
case CEPH_SESSION_REJECT:
WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
pr_info("mds%d rejected session\n", session->s_mds);
session->s_state = CEPH_MDS_SESSION_REJECTED;
cleanup_session_requests(mdsc, session);
remove_session_caps(session);
wake = 2; /* for good measure */
break;
default:
pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
WARN_ON(1);
......@@ -3557,11 +3570,11 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
/*
* true if all sessions are closed, or we force unmount
*/
static bool done_closing_sessions(struct ceph_mds_client *mdsc)
static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
{
if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
return true;
return atomic_read(&mdsc->num_sessions) == 0;
return atomic_read(&mdsc->num_sessions) <= skipped;
}
/*
......@@ -3572,6 +3585,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
struct ceph_options *opts = mdsc->fsc->client->options;
struct ceph_mds_session *session;
int i;
int skipped = 0;
dout("close_sessions\n");
......@@ -3583,7 +3597,8 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
continue;
mutex_unlock(&mdsc->mutex);
mutex_lock(&session->s_mutex);
__close_session(mdsc, session);
if (__close_session(mdsc, session) <= 0)
skipped++;
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
mutex_lock(&mdsc->mutex);
......@@ -3591,7 +3606,8 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
mutex_unlock(&mdsc->mutex);
dout("waiting for sessions to close\n");
wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
wait_event_timeout(mdsc->session_close_wq,
done_closing_sessions(mdsc, skipped),
ceph_timeout_jiffies(opts->mount_timeout));
/* tear down remaining sessions */
......
......@@ -121,6 +121,7 @@ enum {
CEPH_MDS_SESSION_CLOSING = 5,
CEPH_MDS_SESSION_RESTARTING = 6,
CEPH_MDS_SESSION_RECONNECTING = 7,
CEPH_MDS_SESSION_REJECTED = 8,
};
struct ceph_mds_session {
......
......@@ -43,6 +43,8 @@ const char *ceph_session_op_name(int op)
case CEPH_SESSION_RECALL_STATE: return "recall_state";
case CEPH_SESSION_FLUSHMSG: return "flushmsg";
case CEPH_SESSION_FLUSHMSG_ACK: return "flushmsg_ack";
case CEPH_SESSION_FORCE_RO: return "force_ro";
case CEPH_SESSION_REJECT: return "reject";
}
return "???";
}
......
......@@ -396,10 +396,12 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
*/
dev_name_end = strchr(dev_name, '/');
if (dev_name_end) {
fsopt->server_path = kstrdup(dev_name_end, GFP_KERNEL);
if (!fsopt->server_path) {
err = -ENOMEM;
goto out;
if (strlen(dev_name_end) > 1) {
fsopt->server_path = kstrdup(dev_name_end, GFP_KERNEL);
if (!fsopt->server_path) {
err = -ENOMEM;
goto out;
}
}
} else {
dev_name_end = dev_name + strlen(dev_name);
......@@ -788,15 +790,10 @@ static struct dentry *open_root_dentry(struct ceph_fs_client *fsc,
struct inode *inode = req->r_target_inode;
req->r_target_inode = NULL;
dout("open_root_inode success\n");
if (ceph_ino(inode) == CEPH_INO_ROOT &&
fsc->sb->s_root == NULL) {
root = d_make_root(inode);
if (!root) {
root = ERR_PTR(-ENOMEM);
goto out;
}
} else {
root = d_obtain_root(inode);
root = d_make_root(inode);
if (!root) {
root = ERR_PTR(-ENOMEM);
goto out;
}
ceph_init_dentry(root);
dout("open_root_inode success, root dentry is %p\n", root);
......@@ -825,17 +822,24 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc)
mutex_lock(&fsc->client->mount_mutex);
if (!fsc->sb->s_root) {
const char *path;
err = __ceph_open_session(fsc->client, started);
if (err < 0)
goto out;
dout("mount opening root\n");
root = open_root_dentry(fsc, "", started);
if (!fsc->mount_options->server_path) {
path = "";
dout("mount opening path \\t\n");
} else {
path = fsc->mount_options->server_path + 1;
dout("mount opening path %s\n", path);
}
root = open_root_dentry(fsc, path, started);
if (IS_ERR(root)) {
err = PTR_ERR(root);
goto out;
}
fsc->sb->s_root = root;
fsc->sb->s_root = dget(root);
first = 1;
err = ceph_fs_debugfs_init(fsc);
......@@ -843,19 +847,6 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc)
goto fail;
}
if (!fsc->mount_options->server_path) {
root = fsc->sb->s_root;
dget(root);
} else {
const char *path = fsc->mount_options->server_path + 1;
dout("mount opening path %s\n", path);
root = open_root_dentry(fsc, path, started);
if (IS_ERR(root)) {
err = PTR_ERR(root);
goto fail;
}
}
fsc->mount_state = CEPH_MOUNT_MOUNTED;
dout("mount success\n");
mutex_unlock(&fsc->client->mount_mutex);
......
......@@ -104,7 +104,7 @@ extern int ceph_auth_build_hello(struct ceph_auth_client *ac,
extern int ceph_handle_auth_reply(struct ceph_auth_client *ac,
void *buf, size_t len,
void *reply_buf, size_t reply_len);
extern int ceph_entity_name_encode(const char *name, void **p, void *end);
int ceph_auth_entity_name_encode(const char *name, void **p, void *end);
extern int ceph_build_auth(struct ceph_auth_client *ac,
void *msg_buf, size_t msg_len);
......
......@@ -138,6 +138,9 @@ struct ceph_dir_layout {
#define CEPH_MSG_POOLOP_REPLY 48
#define CEPH_MSG_POOLOP 49
/* mon commands */
#define CEPH_MSG_MON_COMMAND 50
#define CEPH_MSG_MON_COMMAND_ACK 51
/* osd */
#define CEPH_MSG_OSD_MAP 41
......@@ -176,6 +179,14 @@ struct ceph_mon_statfs_reply {
struct ceph_statfs st;
} __attribute__ ((packed));
struct ceph_mon_command {
struct ceph_mon_request_header monhdr;
struct ceph_fsid fsid;
__le32 num_strs; /* always 1 */
__le32 str_len;
char str[];
} __attribute__ ((packed));
struct ceph_osd_getmap {
struct ceph_mon_request_header monhdr;
struct ceph_fsid fsid;
......@@ -270,6 +281,7 @@ enum {
CEPH_SESSION_FLUSHMSG,
CEPH_SESSION_FLUSHMSG_ACK,
CEPH_SESSION_FORCE_RO,
CEPH_SESSION_REJECT,
};
extern const char *ceph_session_op_name(int op);
......
#ifndef _LINUX_CEPH_CLS_LOCK_CLIENT_H
#define _LINUX_CEPH_CLS_LOCK_CLIENT_H
#include <linux/ceph/osd_client.h>
enum ceph_cls_lock_type {
CEPH_CLS_LOCK_NONE = 0,
CEPH_CLS_LOCK_EXCLUSIVE = 1,
CEPH_CLS_LOCK_SHARED = 2,
};
struct ceph_locker_id {
struct ceph_entity_name name; /* locker's client name */
char *cookie; /* locker's cookie */
};
struct ceph_locker_info {
struct ceph_entity_addr addr; /* locker's address */
};
struct ceph_locker {
struct ceph_locker_id id;
struct ceph_locker_info info;
};
int ceph_cls_lock(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
char *lock_name, u8 type, char *cookie,
char *tag, char *desc, u8 flags);
int ceph_cls_unlock(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
char *lock_name, char *cookie);
int ceph_cls_break_lock(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
char *lock_name, char *cookie,
struct ceph_entity_name *locker);
void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers);
int ceph_cls_lock_info(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
char *lock_name, u8 *type, char **tag,
struct ceph_locker **lockers, u32 *num_lockers);
#endif
......@@ -264,7 +264,8 @@ extern struct ceph_client *ceph_create_client(struct ceph_options *opt,
void *private,
u64 supported_features,
u64 required_features);
extern u64 ceph_client_id(struct ceph_client *client);
struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client);
u64 ceph_client_gid(struct ceph_client *client);
extern void ceph_destroy_client(struct ceph_client *client);
extern int __ceph_open_session(struct ceph_client *client,
unsigned long started);
......
......@@ -141,6 +141,9 @@ int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
ceph_monc_callback_t cb, u64 private_data);
int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
struct ceph_entity_addr *client_addr);
extern int ceph_monc_open_session(struct ceph_mon_client *monc);
extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
......
......@@ -120,6 +120,9 @@ struct ceph_osd_req_op {
struct ceph_osd_data request_data;
struct ceph_osd_data response_data;
} notify;
struct {
struct ceph_osd_data response_data;
} list_watchers;
struct {
u64 expected_object_size;
u64 expected_write_size;
......@@ -249,6 +252,12 @@ struct ceph_osd_linger_request {
size_t *preply_len;
};
struct ceph_watch_item {
struct ceph_entity_name name;
u64 cookie;
struct ceph_entity_addr addr;
};
struct ceph_osd_client {
struct ceph_client *client;
......@@ -346,7 +355,6 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
struct page **pages, u64 length,
u32 alignment, bool pages_from_pool,
bool own_pages);
extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
const char *class, const char *method);
......@@ -389,6 +397,14 @@ extern void ceph_osdc_sync(struct ceph_osd_client *osdc);
extern void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc);
void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc);
int ceph_osdc_call(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
const char *class, const char *method,
unsigned int flags,
struct page *req_page, size_t req_len,
struct page *resp_page, size_t *resp_len);
extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
......@@ -434,5 +450,10 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
size_t *preply_len);
int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
struct ceph_osd_linger_request *lreq);
int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
struct ceph_watch_item **watchers,
u32 *num_watchers);
#endif
......@@ -5,6 +5,7 @@ obj-$(CONFIG_CEPH_LIB) += libceph.o
libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
mon_client.o \
cls_lock_client.o \
osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \
debugfs.o \
auth.o auth_none.o \
......
......@@ -82,7 +82,10 @@ void ceph_auth_reset(struct ceph_auth_client *ac)
mutex_unlock(&ac->mutex);
}
int ceph_entity_name_encode(const char *name, void **p, void *end)
/*
* EntityName, not to be confused with entity_name_t
*/
int ceph_auth_entity_name_encode(const char *name, void **p, void *end)
{
int len = strlen(name);
......@@ -111,7 +114,7 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len)
monhdr->session_mon = cpu_to_le16(-1);
monhdr->session_mon_tid = 0;
ceph_encode_32(&p, 0); /* no protocol, yet */
ceph_encode_32(&p, CEPH_AUTH_UNKNOWN); /* no protocol, yet */
lenp = p;
p += sizeof(u32);
......@@ -124,7 +127,7 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len)
for (i = 0; i < num; i++)
ceph_encode_32(&p, supported_protocols[i]);
ret = ceph_entity_name_encode(ac->name, &p, end);
ret = ceph_auth_entity_name_encode(ac->name, &p, end);
if (ret < 0)
goto out;
ceph_decode_need(&p, end, sizeof(u64), bad);
......@@ -259,9 +262,7 @@ int ceph_build_auth(struct ceph_auth_client *ac,
int ret = 0;
mutex_lock(&ac->mutex);
if (!ac->protocol)
ret = ceph_auth_build_hello(ac, msg_buf, msg_len);
else if (ac->ops->should_authenticate(ac))
if (ac->ops->should_authenticate(ac))
ret = ceph_build_auth_request(ac, msg_buf, msg_len);
mutex_unlock(&ac->mutex);
return ret;
......
......@@ -46,7 +46,7 @@ static int ceph_auth_none_build_authorizer(struct ceph_auth_client *ac,
int ret;
ceph_encode_8_safe(&p, end, 1, e_range);
ret = ceph_entity_name_encode(ac->name, &p, end);
ret = ceph_auth_entity_name_encode(ac->name, &p, end);
if (ret < 0)
return ret;
......
......@@ -566,11 +566,17 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client)
}
EXPORT_SYMBOL(ceph_print_client_options);
u64 ceph_client_id(struct ceph_client *client)
struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client)
{
return &client->msgr.inst.addr;
}
EXPORT_SYMBOL(ceph_client_addr);
u64 ceph_client_gid(struct ceph_client *client)
{
return client->monc.auth->global_id;
}
EXPORT_SYMBOL(ceph_client_id);
EXPORT_SYMBOL(ceph_client_gid);
/*
* create a fresh client instance
......@@ -685,7 +691,8 @@ int __ceph_open_session(struct ceph_client *client, unsigned long started)
return client->auth_err;
}
pr_info("client%llu fsid %pU\n", ceph_client_id(client), &client->fsid);
pr_info("client%llu fsid %pU\n", ceph_client_gid(client),
&client->fsid);
ceph_debugfs_client_init(client);
return 0;
......
......@@ -15,6 +15,7 @@ const char *ceph_entity_type_name(int type)
default: return "unknown";
}
}
EXPORT_SYMBOL(ceph_entity_type_name);
const char *ceph_osd_op_name(int op)
{
......
#include <linux/ceph/ceph_debug.h>
#include <linux/types.h>
#include <linux/slab.h>
#include <linux/ceph/cls_lock_client.h>
#include <linux/ceph/decode.h>
/**
* ceph_cls_lock - grab rados lock for object
* @oid, @oloc: object to lock
* @lock_name: the name of the lock
* @type: lock type (CEPH_CLS_LOCK_EXCLUSIVE or CEPH_CLS_LOCK_SHARED)
* @cookie: user-defined identifier for this instance of the lock
* @tag: user-defined tag
* @desc: user-defined lock description
* @flags: lock flags
*
* All operations on the same lock should use the same tag.
*/
int ceph_cls_lock(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
char *lock_name, u8 type, char *cookie,
char *tag, char *desc, u8 flags)
{
int lock_op_buf_size;
int name_len = strlen(lock_name);
int cookie_len = strlen(cookie);
int tag_len = strlen(tag);
int desc_len = strlen(desc);
void *p, *end;
struct page *lock_op_page;
struct timespec mtime;
int ret;
lock_op_buf_size = name_len + sizeof(__le32) +
cookie_len + sizeof(__le32) +
tag_len + sizeof(__le32) +
desc_len + sizeof(__le32) +
sizeof(struct ceph_timespec) +
/* flag and type */
sizeof(u8) + sizeof(u8) +
CEPH_ENCODING_START_BLK_LEN;
if (lock_op_buf_size > PAGE_SIZE)
return -E2BIG;
lock_op_page = alloc_page(GFP_NOIO);
if (!lock_op_page)
return -ENOMEM;
p = page_address(lock_op_page);
end = p + lock_op_buf_size;
/* encode cls_lock_lock_op struct */
ceph_start_encoding(&p, 1, 1,
lock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
ceph_encode_string(&p, end, lock_name, name_len);
ceph_encode_8(&p, type);
ceph_encode_string(&p, end, cookie, cookie_len);
ceph_encode_string(&p, end, tag, tag_len);
ceph_encode_string(&p, end, desc, desc_len);
/* only support infinite duration */
memset(&mtime, 0, sizeof(mtime));
ceph_encode_timespec(p, &mtime);
p += sizeof(struct ceph_timespec);
ceph_encode_8(&p, flags);
dout("%s lock_name %s type %d cookie %s tag %s desc %s flags 0x%x\n",
__func__, lock_name, type, cookie, tag, desc, flags);
ret = ceph_osdc_call(osdc, oid, oloc, "lock", "lock",
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
lock_op_page, lock_op_buf_size, NULL, NULL);
dout("%s: status %d\n", __func__, ret);
__free_page(lock_op_page);
return ret;
}
EXPORT_SYMBOL(ceph_cls_lock);
/**
* ceph_cls_unlock - release rados lock for object
* @oid, @oloc: object to lock
* @lock_name: the name of the lock
* @cookie: user-defined identifier for this instance of the lock
*/
int ceph_cls_unlock(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
char *lock_name, char *cookie)
{
int unlock_op_buf_size;
int name_len = strlen(lock_name);
int cookie_len = strlen(cookie);
void *p, *end;
struct page *unlock_op_page;
int ret;
unlock_op_buf_size = name_len + sizeof(__le32) +
cookie_len + sizeof(__le32) +
CEPH_ENCODING_START_BLK_LEN;
if (unlock_op_buf_size > PAGE_SIZE)
return -E2BIG;
unlock_op_page = alloc_page(GFP_NOIO);
if (!unlock_op_page)
return -ENOMEM;
p = page_address(unlock_op_page);
end = p + unlock_op_buf_size;
/* encode cls_lock_unlock_op struct */
ceph_start_encoding(&p, 1, 1,
unlock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
ceph_encode_string(&p, end, lock_name, name_len);
ceph_encode_string(&p, end, cookie, cookie_len);
dout("%s lock_name %s cookie %s\n", __func__, lock_name, cookie);
ret = ceph_osdc_call(osdc, oid, oloc, "lock", "unlock",
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
unlock_op_page, unlock_op_buf_size, NULL, NULL);
dout("%s: status %d\n", __func__, ret);
__free_page(unlock_op_page);
return ret;
}
EXPORT_SYMBOL(ceph_cls_unlock);
/**
* ceph_cls_break_lock - release rados lock for object for specified client
* @oid, @oloc: object to lock
* @lock_name: the name of the lock
* @cookie: user-defined identifier for this instance of the lock
* @locker: current lock owner
*/
int ceph_cls_break_lock(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
char *lock_name, char *cookie,
struct ceph_entity_name *locker)
{
int break_op_buf_size;
int name_len = strlen(lock_name);
int cookie_len = strlen(cookie);
struct page *break_op_page;
void *p, *end;
int ret;
break_op_buf_size = name_len + sizeof(__le32) +
cookie_len + sizeof(__le32) +
sizeof(u8) + sizeof(__le64) +
CEPH_ENCODING_START_BLK_LEN;
if (break_op_buf_size > PAGE_SIZE)
return -E2BIG;
break_op_page = alloc_page(GFP_NOIO);
if (!break_op_page)
return -ENOMEM;
p = page_address(break_op_page);
end = p + break_op_buf_size;
/* encode cls_lock_break_op struct */
ceph_start_encoding(&p, 1, 1,
break_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
ceph_encode_string(&p, end, lock_name, name_len);
ceph_encode_copy(&p, locker, sizeof(*locker));
ceph_encode_string(&p, end, cookie, cookie_len);
dout("%s lock_name %s cookie %s locker %s%llu\n", __func__, lock_name,
cookie, ENTITY_NAME(*locker));
ret = ceph_osdc_call(osdc, oid, oloc, "lock", "break_lock",
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
break_op_page, break_op_buf_size, NULL, NULL);
dout("%s: status %d\n", __func__, ret);
__free_page(break_op_page);
return ret;
}
EXPORT_SYMBOL(ceph_cls_break_lock);
void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers)
{
int i;
for (i = 0; i < num_lockers; i++)
kfree(lockers[i].id.cookie);
kfree(lockers);
}
EXPORT_SYMBOL(ceph_free_lockers);
static int decode_locker(void **p, void *end, struct ceph_locker *locker)
{
u8 struct_v;
u32 len;
char *s;
int ret;
ret = ceph_start_decoding(p, end, 1, "locker_id_t", &struct_v, &len);
if (ret)
return ret;
ceph_decode_copy(p, &locker->id.name, sizeof(locker->id.name));
s = ceph_extract_encoded_string(p, end, NULL, GFP_NOIO);
if (IS_ERR(s))
return PTR_ERR(s);
locker->id.cookie = s;
ret = ceph_start_decoding(p, end, 1, "locker_info_t", &struct_v, &len);
if (ret)
return ret;
*p += sizeof(struct ceph_timespec); /* skip expiration */
ceph_decode_copy(p, &locker->info.addr, sizeof(locker->info.addr));
ceph_decode_addr(&locker->info.addr);
len = ceph_decode_32(p);
*p += len; /* skip description */
dout("%s %s%llu cookie %s addr %s\n", __func__,
ENTITY_NAME(locker->id.name), locker->id.cookie,
ceph_pr_addr(&locker->info.addr.in_addr));
return 0;
}
static int decode_lockers(void **p, void *end, u8 *type, char **tag,
struct ceph_locker **lockers, u32 *num_lockers)
{
u8 struct_v;
u32 struct_len;
char *s;
int i;
int ret;
ret = ceph_start_decoding(p, end, 1, "cls_lock_get_info_reply",
&struct_v, &struct_len);
if (ret)
return ret;
*num_lockers = ceph_decode_32(p);
*lockers = kcalloc(*num_lockers, sizeof(**lockers), GFP_NOIO);
if (!*lockers)
return -ENOMEM;
for (i = 0; i < *num_lockers; i++) {
ret = decode_locker(p, end, *lockers + i);
if (ret)
goto err_free_lockers;
}
*type = ceph_decode_8(p);
s = ceph_extract_encoded_string(p, end, NULL, GFP_NOIO);
if (IS_ERR(s)) {
ret = PTR_ERR(s);
goto err_free_lockers;
}
*tag = s;
return 0;
err_free_lockers:
ceph_free_lockers(*lockers, *num_lockers);
return ret;
}
/*
* On success, the caller is responsible for:
*
* kfree(tag);
* ceph_free_lockers(lockers, num_lockers);
*/
int ceph_cls_lock_info(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
char *lock_name, u8 *type, char **tag,
struct ceph_locker **lockers, u32 *num_lockers)
{
int get_info_op_buf_size;
int name_len = strlen(lock_name);
struct page *get_info_op_page, *reply_page;
size_t reply_len;
void *p, *end;
int ret;
get_info_op_buf_size = name_len + sizeof(__le32) +
CEPH_ENCODING_START_BLK_LEN;
if (get_info_op_buf_size > PAGE_SIZE)
return -E2BIG;
get_info_op_page = alloc_page(GFP_NOIO);
if (!get_info_op_page)
return -ENOMEM;
reply_page = alloc_page(GFP_NOIO);
if (!reply_page) {
__free_page(get_info_op_page);
return -ENOMEM;
}
p = page_address(get_info_op_page);
end = p + get_info_op_buf_size;
/* encode cls_lock_get_info_op struct */
ceph_start_encoding(&p, 1, 1,
get_info_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
ceph_encode_string(&p, end, lock_name, name_len);
dout("%s lock_name %s\n", __func__, lock_name);
ret = ceph_osdc_call(osdc, oid, oloc, "lock", "get_info",
CEPH_OSD_FLAG_READ, get_info_op_page,
get_info_op_buf_size, reply_page, &reply_len);
dout("%s: status %d\n", __func__, ret);
if (ret >= 0) {
p = page_address(reply_page);
end = p + reply_len;
ret = decode_lockers(&p, end, type, tag, lockers, num_lockers);
}
__free_page(get_info_op_page);
__free_page(reply_page);
return ret;
}
EXPORT_SYMBOL(ceph_cls_lock_info);
......@@ -245,7 +245,7 @@ static int bucket_straw_choose(struct crush_bucket_straw *bucket,
/* compute 2^44*log2(input+1) */
static __u64 crush_ln(unsigned int xin)
{
unsigned int x = xin, x1;
unsigned int x = xin;
int iexpon, index1, index2;
__u64 RH, LH, LL, xl64, result;
......@@ -253,9 +253,15 @@ static __u64 crush_ln(unsigned int xin)
/* normalize input */
iexpon = 15;
while (!(x & 0x18000)) {
x <<= 1;
iexpon--;
/*
* figure out number of bits we need to shift and
* do it in one step instead of iteratively
*/
if (!(x & 0x18000)) {
int bits = __builtin_clz(x & 0x1FFFF) - 16;
x <<= bits;
iexpon = 15 - bits;
}
index1 = (x >> 8) << 1;
......@@ -267,12 +273,11 @@ static __u64 crush_ln(unsigned int xin)
/* RH*x ~ 2^48 * (2^15 + xf), xf<2^8 */
xl64 = (__s64)x * RH;
xl64 >>= 48;
x1 = xl64;
result = iexpon;
result <<= (12 + 32);
index2 = x1 & 0xff;
index2 = xl64 & 0xff;
/* LL ~ 2^48*log2(1.0+index2/2^15) */
LL = __LL_tbl[index2];
......
......@@ -835,6 +835,83 @@ int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
}
EXPORT_SYMBOL(ceph_monc_get_version_async);
static void handle_command_ack(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
struct ceph_mon_generic_request *req;
void *p = msg->front.iov_base;
void *const end = p + msg->front_alloc_len;
u64 tid = le64_to_cpu(msg->hdr.tid);
dout("%s msg %p tid %llu\n", __func__, msg, tid);
ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
sizeof(u32), bad);
p += sizeof(struct ceph_mon_request_header);
mutex_lock(&monc->mutex);
req = lookup_generic_request(&monc->generic_request_tree, tid);
if (!req) {
mutex_unlock(&monc->mutex);
return;
}
req->result = ceph_decode_32(&p);
__finish_generic_request(req);
mutex_unlock(&monc->mutex);
complete_generic_request(req);
return;
bad:
pr_err("corrupt mon_command ack, tid %llu\n", tid);
ceph_msg_dump(msg);
}
int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
struct ceph_entity_addr *client_addr)
{
struct ceph_mon_generic_request *req;
struct ceph_mon_command *h;
int ret = -ENOMEM;
int len;
req = alloc_generic_request(monc, GFP_NOIO);
if (!req)
goto out;
req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
if (!req->request)
goto out;
req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
true);
if (!req->reply)
goto out;
mutex_lock(&monc->mutex);
register_generic_request(req);
h = req->request->front.iov_base;
h->monhdr.have_version = 0;
h->monhdr.session_mon = cpu_to_le16(-1);
h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid;
h->num_strs = cpu_to_le32(1);
len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
\"blacklistop\": \"add\", \
\"addr\": \"%pISpc/%u\" }",
&client_addr->in_addr, le32_to_cpu(client_addr->nonce));
h->str_len = cpu_to_le32(len);
send_generic_request(monc, req);
mutex_unlock(&monc->mutex);
ret = wait_generic_request(req);
out:
put_generic_request(req);
return ret;
}
EXPORT_SYMBOL(ceph_monc_blacklist_add);
/*
* Resend pending generic requests.
*/
......@@ -1139,6 +1216,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
handle_get_version_reply(monc, msg);
break;
case CEPH_MSG_MON_COMMAND_ACK:
handle_command_ack(monc, msg);
break;
case CEPH_MSG_MON_MAP:
ceph_monc_handle_map(monc, msg);
break;
......@@ -1178,6 +1259,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
m = ceph_msg_get(monc->m_subscribe_ack);
break;
case CEPH_MSG_STATFS_REPLY:
case CEPH_MSG_MON_COMMAND_ACK:
return get_generic_reply(con, hdr, skip);
case CEPH_MSG_AUTH_REPLY:
m = ceph_msg_get(monc->m_auth_reply);
......
......@@ -338,6 +338,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
ceph_osd_data_release(&op->notify.request_data);
ceph_osd_data_release(&op->notify.response_data);
break;
case CEPH_OSD_OP_LIST_WATCHERS:
ceph_osd_data_release(&op->list_watchers.response_data);
break;
default:
break;
}
......@@ -863,6 +866,8 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
case CEPH_OSD_OP_NOTIFY:
dst->notify.cookie = cpu_to_le64(src->notify.cookie);
break;
case CEPH_OSD_OP_LIST_WATCHERS:
break;
case CEPH_OSD_OP_SETALLOCHINT:
dst->alloc_hint.expected_object_size =
cpu_to_le64(src->alloc_hint.expected_object_size);
......@@ -1445,6 +1450,10 @@ static void setup_request_data(struct ceph_osd_request *req,
ceph_osdc_msg_data_add(req->r_reply,
&op->extent.osd_data);
break;
case CEPH_OSD_OP_LIST_WATCHERS:
ceph_osdc_msg_data_add(req->r_reply,
&op->list_watchers.response_data);
break;
/* both */
case CEPH_OSD_OP_CALL:
......@@ -3891,12 +3900,121 @@ int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
return ret;
}
static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
{
u8 struct_v;
u32 struct_len;
int ret;
ret = ceph_start_decoding(p, end, 2, "watch_item_t",
&struct_v, &struct_len);
if (ret)
return ret;
ceph_decode_copy(p, &item->name, sizeof(item->name));
item->cookie = ceph_decode_64(p);
*p += 4; /* skip timeout_seconds */
if (struct_v >= 2) {
ceph_decode_copy(p, &item->addr, sizeof(item->addr));
ceph_decode_addr(&item->addr);
}
dout("%s %s%llu cookie %llu addr %s\n", __func__,
ENTITY_NAME(item->name), item->cookie,
ceph_pr_addr(&item->addr.in_addr));
return 0;
}
static int decode_watchers(void **p, void *end,
struct ceph_watch_item **watchers,
u32 *num_watchers)
{
u8 struct_v;
u32 struct_len;
int i;
int ret;
ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
&struct_v, &struct_len);
if (ret)
return ret;
*num_watchers = ceph_decode_32(p);
*watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
if (!*watchers)
return -ENOMEM;
for (i = 0; i < *num_watchers; i++) {
ret = decode_watcher(p, end, *watchers + i);
if (ret) {
kfree(*watchers);
return ret;
}
}
return 0;
}
/*
* On success, the caller is responsible for:
*
* kfree(watchers);
*/
int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
struct ceph_watch_item **watchers,
u32 *num_watchers)
{
struct ceph_osd_request *req;
struct page **pages;
int ret;
req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
if (!req)
return -ENOMEM;
ceph_oid_copy(&req->r_base_oid, oid);
ceph_oloc_copy(&req->r_base_oloc, oloc);
req->r_flags = CEPH_OSD_FLAG_READ;
ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
if (ret)
goto out_put_req;
pages = ceph_alloc_page_vector(1, GFP_NOIO);
if (IS_ERR(pages)) {
ret = PTR_ERR(pages);
goto out_put_req;
}
osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
response_data),
pages, PAGE_SIZE, 0, false, true);
ceph_osdc_start_request(osdc, req, false);
ret = ceph_osdc_wait_request(osdc, req);
if (ret >= 0) {
void *p = page_address(pages[0]);
void *const end = p + req->r_ops[0].outdata_len;
ret = decode_watchers(&p, end, watchers, num_watchers);
}
out_put_req:
ceph_osdc_put_request(req);
return ret;
}
EXPORT_SYMBOL(ceph_osdc_list_watchers);
/*
* Call all pending notify callbacks - for use after a watch is
* unregistered, to make sure no more callbacks for it will be invoked
*/
void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
{
dout("%s osdc %p\n", __func__, osdc);
flush_workqueue(osdc->notify_wq);
}
EXPORT_SYMBOL(ceph_osdc_flush_notifies);
......@@ -3909,6 +4027,57 @@ void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
}
EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
/*
* Execute an OSD class method on an object.
*
* @flags: CEPH_OSD_FLAG_*
* @resp_len: out param for reply length
*/
int ceph_osdc_call(struct ceph_osd_client *osdc,
struct ceph_object_id *oid,
struct ceph_object_locator *oloc,
const char *class, const char *method,
unsigned int flags,
struct page *req_page, size_t req_len,
struct page *resp_page, size_t *resp_len)
{
struct ceph_osd_request *req;
int ret;
req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
if (!req)
return -ENOMEM;
ceph_oid_copy(&req->r_base_oid, oid);
ceph_oloc_copy(&req->r_base_oloc, oloc);
req->r_flags = flags;
ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
if (ret)
goto out_put_req;
osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
if (req_page)
osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
0, false, false);
if (resp_page)
osd_req_op_cls_response_data_pages(req, 0, &resp_page,
PAGE_SIZE, 0, false, false);
ceph_osdc_start_request(osdc, req, false);
ret = ceph_osdc_wait_request(osdc, req);
if (ret >= 0) {
ret = req->r_ops[0].rval;
if (resp_page)
*resp_len = req->r_ops[0].outdata_len;
}
out_put_req:
ceph_osdc_put_request(req);
return ret;
}
EXPORT_SYMBOL(ceph_osdc_call);
/*
* init, shutdown
*/
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment