Commit 99d16943 authored by Ilya Dryomov's avatar Ilya Dryomov

rbd: retry watch re-registration periodically

Revamp watch code to support retrying watch re-registration:

- add rbd_dev->watch_state for more robust errcb handling
- store watch cookie separately to avoid dereferencing watch_handle
  which is set to NULL on unwatch
- move re-register code into a delayed work and retry re-registration
  every second, unless the client is blacklisted
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
Reviewed-by: default avatarMike Christie <mchristi@redhat.com>
Tested-by: default avatarMike Christie <mchristi@redhat.com>
parent 1643dfa4
...@@ -114,6 +114,8 @@ static int atomic_dec_return_safe(atomic_t *v) ...@@ -114,6 +114,8 @@ static int atomic_dec_return_safe(atomic_t *v)
#define RBD_OBJ_PREFIX_LEN_MAX 64 #define RBD_OBJ_PREFIX_LEN_MAX 64
#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
/* Feature bits */ /* Feature bits */
#define RBD_FEATURE_LAYERING (1<<0) #define RBD_FEATURE_LAYERING (1<<0)
...@@ -319,6 +321,12 @@ struct rbd_img_request { ...@@ -319,6 +321,12 @@ struct rbd_img_request {
#define for_each_obj_request_safe(ireq, oreq, n) \ #define for_each_obj_request_safe(ireq, oreq, n) \
list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
enum rbd_watch_state {
RBD_WATCH_STATE_UNREGISTERED,
RBD_WATCH_STATE_REGISTERED,
RBD_WATCH_STATE_ERROR,
};
struct rbd_mapping { struct rbd_mapping {
u64 size; u64 size;
u64 features; u64 features;
...@@ -352,7 +360,11 @@ struct rbd_device { ...@@ -352,7 +360,11 @@ struct rbd_device {
struct ceph_file_layout layout; /* used for all rbd requests */ struct ceph_file_layout layout; /* used for all rbd requests */
struct mutex watch_mutex;
enum rbd_watch_state watch_state;
struct ceph_osd_linger_request *watch_handle; struct ceph_osd_linger_request *watch_handle;
u64 watch_cookie;
struct delayed_work watch_dwork;
struct workqueue_struct *task_wq; struct workqueue_struct *task_wq;
...@@ -3083,9 +3095,6 @@ static void rbd_img_parent_read(struct rbd_obj_request *obj_request) ...@@ -3083,9 +3095,6 @@ static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
obj_request_done_set(obj_request); obj_request_done_set(obj_request);
} }
static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev);
static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev);
static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
u64 notifier_id, void *data, size_t data_len) u64 notifier_id, void *data, size_t data_len)
{ {
...@@ -3113,35 +3122,34 @@ static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, ...@@ -3113,35 +3122,34 @@ static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
rbd_warn(rbd_dev, "notify_ack ret %d", ret); rbd_warn(rbd_dev, "notify_ack ret %d", ret);
} }
static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
static void rbd_watch_errcb(void *arg, u64 cookie, int err) static void rbd_watch_errcb(void *arg, u64 cookie, int err)
{ {
struct rbd_device *rbd_dev = arg; struct rbd_device *rbd_dev = arg;
int ret;
rbd_warn(rbd_dev, "encountered watch error: %d", err); rbd_warn(rbd_dev, "encountered watch error: %d", err);
__rbd_dev_header_unwatch_sync(rbd_dev); mutex_lock(&rbd_dev->watch_mutex);
if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
__rbd_unregister_watch(rbd_dev);
rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
ret = rbd_dev_header_watch_sync(rbd_dev); queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
if (ret) {
rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
return;
} }
mutex_unlock(&rbd_dev->watch_mutex);
ret = rbd_dev_refresh(rbd_dev);
if (ret)
rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
} }
/* /*
* Initiate a watch request, synchronously. * watch_mutex must be locked
*/ */
static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) static int __rbd_register_watch(struct rbd_device *rbd_dev)
{ {
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct ceph_osd_linger_request *handle; struct ceph_osd_linger_request *handle;
rbd_assert(!rbd_dev->watch_handle); rbd_assert(!rbd_dev->watch_handle);
dout("%s rbd_dev %p\n", __func__, rbd_dev);
handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid, handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, rbd_watch_cb, &rbd_dev->header_oloc, rbd_watch_cb,
...@@ -3153,13 +3161,16 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) ...@@ -3153,13 +3161,16 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
return 0; return 0;
} }
static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) /*
* watch_mutex must be locked
*/
static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
{ {
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
int ret; int ret;
if (!rbd_dev->watch_handle) rbd_assert(rbd_dev->watch_handle);
return; dout("%s rbd_dev %p\n", __func__, rbd_dev);
ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle); ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
if (ret) if (ret)
...@@ -3168,17 +3179,80 @@ static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) ...@@ -3168,17 +3179,80 @@ static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
rbd_dev->watch_handle = NULL; rbd_dev->watch_handle = NULL;
} }
/* static int rbd_register_watch(struct rbd_device *rbd_dev)
* Tear down a watch request, synchronously. {
*/ int ret;
static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
mutex_lock(&rbd_dev->watch_mutex);
rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
ret = __rbd_register_watch(rbd_dev);
if (ret)
goto out;
rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
out:
mutex_unlock(&rbd_dev->watch_mutex);
return ret;
}
static void cancel_tasks_sync(struct rbd_device *rbd_dev)
{ {
__rbd_dev_header_unwatch_sync(rbd_dev); dout("%s rbd_dev %p\n", __func__, rbd_dev);
cancel_delayed_work_sync(&rbd_dev->watch_dwork);
}
static void rbd_unregister_watch(struct rbd_device *rbd_dev)
{
cancel_tasks_sync(rbd_dev);
mutex_lock(&rbd_dev->watch_mutex);
if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
__rbd_unregister_watch(rbd_dev);
rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
mutex_unlock(&rbd_dev->watch_mutex);
dout("%s flushing notifies\n", __func__);
ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
} }
static void rbd_reregister_watch(struct work_struct *work)
{
struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
struct rbd_device, watch_dwork);
int ret;
dout("%s rbd_dev %p\n", __func__, rbd_dev);
mutex_lock(&rbd_dev->watch_mutex);
if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR)
goto fail_unlock;
ret = __rbd_register_watch(rbd_dev);
if (ret) {
rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
if (ret != -EBLACKLISTED)
queue_delayed_work(rbd_dev->task_wq,
&rbd_dev->watch_dwork,
RBD_RETRY_DELAY);
goto fail_unlock;
}
rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
mutex_unlock(&rbd_dev->watch_mutex);
ret = rbd_dev_refresh(rbd_dev);
if (ret)
rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
return;
fail_unlock:
mutex_unlock(&rbd_dev->watch_mutex);
}
/* /*
* Synchronous osd object method call. Returns the number of bytes * Synchronous osd object method call. Returns the number of bytes
* returned in the outbound buffer, or a negative error code. * returned in the outbound buffer, or a negative error code.
...@@ -3945,6 +4019,8 @@ static void rbd_spec_free(struct kref *kref) ...@@ -3945,6 +4019,8 @@ static void rbd_spec_free(struct kref *kref)
static void rbd_dev_free(struct rbd_device *rbd_dev) static void rbd_dev_free(struct rbd_device *rbd_dev)
{ {
WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
ceph_oid_destroy(&rbd_dev->header_oid); ceph_oid_destroy(&rbd_dev->header_oid);
ceph_oloc_destroy(&rbd_dev->header_oloc); ceph_oloc_destroy(&rbd_dev->header_oloc);
...@@ -3991,6 +4067,10 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, ...@@ -3991,6 +4067,10 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
ceph_oid_init(&rbd_dev->header_oid); ceph_oid_init(&rbd_dev->header_oid);
ceph_oloc_init(&rbd_dev->header_oloc); ceph_oloc_init(&rbd_dev->header_oloc);
mutex_init(&rbd_dev->watch_mutex);
rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
rbd_dev->dev.bus = &rbd_bus_type; rbd_dev->dev.bus = &rbd_bus_type;
rbd_dev->dev.type = &rbd_device_type; rbd_dev->dev.type = &rbd_device_type;
rbd_dev->dev.parent = &rbd_root_dev; rbd_dev->dev.parent = &rbd_root_dev;
...@@ -5222,7 +5302,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) ...@@ -5222,7 +5302,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
goto err_out_format; goto err_out_format;
if (!depth) { if (!depth) {
ret = rbd_dev_header_watch_sync(rbd_dev); ret = rbd_register_watch(rbd_dev);
if (ret) { if (ret) {
if (ret == -ENOENT) if (ret == -ENOENT)
pr_info("image %s/%s does not exist\n", pr_info("image %s/%s does not exist\n",
...@@ -5281,7 +5361,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) ...@@ -5281,7 +5361,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
rbd_dev_unprobe(rbd_dev); rbd_dev_unprobe(rbd_dev);
err_out_watch: err_out_watch:
if (!depth) if (!depth)
rbd_dev_header_unwatch_sync(rbd_dev); rbd_unregister_watch(rbd_dev);
err_out_format: err_out_format:
rbd_dev->image_format = 0; rbd_dev->image_format = 0;
kfree(rbd_dev->spec->image_id); kfree(rbd_dev->spec->image_id);
...@@ -5348,11 +5428,11 @@ static ssize_t do_rbd_add(struct bus_type *bus, ...@@ -5348,11 +5428,11 @@ static ssize_t do_rbd_add(struct bus_type *bus,
rc = rbd_dev_device_setup(rbd_dev); rc = rbd_dev_device_setup(rbd_dev);
if (rc) { if (rc) {
/* /*
* rbd_dev_header_unwatch_sync() can't be moved into * rbd_unregister_watch() can't be moved into
* rbd_dev_image_release() without refactoring, see * rbd_dev_image_release() without refactoring, see
* commit 1f3ef78861ac. * commit 1f3ef78861ac.
*/ */
rbd_dev_header_unwatch_sync(rbd_dev); rbd_unregister_watch(rbd_dev);
rbd_dev_image_release(rbd_dev); rbd_dev_image_release(rbd_dev);
goto out; goto out;
} }
...@@ -5473,7 +5553,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus, ...@@ -5473,7 +5553,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
if (ret < 0 || already) if (ret < 0 || already)
return ret; return ret;
rbd_dev_header_unwatch_sync(rbd_dev); rbd_unregister_watch(rbd_dev);
/* /*
* Don't free anything from rbd_dev->disk until after all * Don't free anything from rbd_dev->disk until after all
......
...@@ -4014,6 +4014,7 @@ EXPORT_SYMBOL(ceph_osdc_list_watchers); ...@@ -4014,6 +4014,7 @@ EXPORT_SYMBOL(ceph_osdc_list_watchers);
*/ */
void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
{ {
dout("%s osdc %p\n", __func__, osdc);
flush_workqueue(osdc->notify_wq); flush_workqueue(osdc->notify_wq);
} }
EXPORT_SYMBOL(ceph_osdc_flush_notifies); EXPORT_SYMBOL(ceph_osdc_flush_notifies);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment