Commit bc1ecc65 authored by Ilya Dryomov's avatar Ilya Dryomov

rbd: rework rbd_request_fn()

While it was never a good idea to sleep in request_fn(), commit
34c6bc2c ("locking/mutexes: Add extra reschedule point") made it
a *bad* idea.  mutex_lock() since 3.15 may reschedule *before* putting
task on the mutex wait queue, which for tasks in !TASK_RUNNING state
means block forever.  request_fn() may be called with !TASK_RUNNING on
the way to schedule() in io_schedule().

Offload request handling to a workqueue, one per rbd device, to avoid
calling blocking primitives from rbd_request_fn().

Fixes: http://tracker.ceph.com/issues/8818

Cc: stable@vger.kernel.org # 3.16, needs backporting for 3.15
Signed-off-by: default avatarIlya Dryomov <ilya.dryomov@inktank.com>
Tested-by: default avatarEric Eastman <eric0e@aol.com>
Tested-by: default avatarGreg Wilson <greg.wilson@keepertech.com>
Reviewed-by: default avatarAlex Elder <elder@linaro.org>
parent 282c1052
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
#include <linux/blkdev.h> #include <linux/blkdev.h>
#include <linux/slab.h> #include <linux/slab.h>
#include <linux/idr.h> #include <linux/idr.h>
#include <linux/workqueue.h>
#include "rbd_types.h" #include "rbd_types.h"
...@@ -332,7 +333,10 @@ struct rbd_device { ...@@ -332,7 +333,10 @@ struct rbd_device {
char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
struct list_head rq_queue; /* incoming rq queue */
spinlock_t lock; /* queue, flags, open_count */ spinlock_t lock; /* queue, flags, open_count */
struct workqueue_struct *rq_wq;
struct work_struct rq_work;
struct rbd_image_header header; struct rbd_image_header header;
unsigned long flags; /* possibly lock protected */ unsigned long flags; /* possibly lock protected */
...@@ -3176,102 +3180,129 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, ...@@ -3176,102 +3180,129 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
return ret; return ret;
} }
static void rbd_request_fn(struct request_queue *q) static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
__releases(q->queue_lock) __acquires(q->queue_lock)
{ {
struct rbd_device *rbd_dev = q->queuedata; struct rbd_img_request *img_request;
struct request *rq; u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
u64 length = blk_rq_bytes(rq);
bool wr = rq_data_dir(rq) == WRITE;
int result; int result;
while ((rq = blk_fetch_request(q))) { /* Ignore/skip any zero-length requests */
bool write_request = rq_data_dir(rq) == WRITE;
struct rbd_img_request *img_request;
u64 offset;
u64 length;
/* Ignore any non-FS requests that filter through. */ if (!length) {
dout("%s: zero-length request\n", __func__);
result = 0;
goto err_rq;
}
if (rq->cmd_type != REQ_TYPE_FS) { /* Disallow writes to a read-only device */
dout("%s: non-fs request type %d\n", __func__,
(int) rq->cmd_type); if (wr) {
__blk_end_request_all(rq, 0); if (rbd_dev->mapping.read_only) {
continue; result = -EROFS;
goto err_rq;
} }
rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
}
/* Ignore/skip any zero-length requests */ /*
* Quit early if the mapped snapshot no longer exists. It's
* still possible the snapshot will have disappeared by the
* time our request arrives at the osd, but there's no sense in
* sending it if we already know.
*/
if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
dout("request for non-existent snapshot");
rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
result = -ENXIO;
goto err_rq;
}
offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT; if (offset && length > U64_MAX - offset + 1) {
length = (u64) blk_rq_bytes(rq); rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
length);
result = -EINVAL;
goto err_rq; /* Shouldn't happen */
}
if (!length) { if (offset + length > rbd_dev->mapping.size) {
dout("%s: zero-length request\n", __func__); rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
__blk_end_request_all(rq, 0); length, rbd_dev->mapping.size);
continue; result = -EIO;
} goto err_rq;
}
spin_unlock_irq(q->queue_lock); img_request = rbd_img_request_create(rbd_dev, offset, length, wr);
if (!img_request) {
result = -ENOMEM;
goto err_rq;
}
img_request->rq = rq;
/* Disallow writes to a read-only device */ result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio);
if (result)
goto err_img_request;
if (write_request) { result = rbd_img_request_submit(img_request);
result = -EROFS; if (result)
if (rbd_dev->mapping.read_only) goto err_img_request;
goto end_request;
rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
}
/* return;
* Quit early if the mapped snapshot no longer
* exists. It's still possible the snapshot will
* have disappeared by the time our request arrives
* at the osd, but there's no sense in sending it if
* we already know.
*/
if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
dout("request for non-existent snapshot");
rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
result = -ENXIO;
goto end_request;
}
result = -EINVAL; err_img_request:
if (offset && length > U64_MAX - offset + 1) { rbd_img_request_put(img_request);
rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n", err_rq:
offset, length); if (result)
goto end_request; /* Shouldn't happen */ rbd_warn(rbd_dev, "%s %llx at %llx result %d",
} wr ? "write" : "read", length, offset, result);
blk_end_request_all(rq, result);
}
result = -EIO; static void rbd_request_workfn(struct work_struct *work)
if (offset + length > rbd_dev->mapping.size) { {
rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n", struct rbd_device *rbd_dev =
offset, length, rbd_dev->mapping.size); container_of(work, struct rbd_device, rq_work);
goto end_request; struct request *rq, *next;
} LIST_HEAD(requests);
result = -ENOMEM; spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
img_request = rbd_img_request_create(rbd_dev, offset, length, list_splice_init(&rbd_dev->rq_queue, &requests);
write_request); spin_unlock_irq(&rbd_dev->lock);
if (!img_request)
goto end_request;
img_request->rq = rq; list_for_each_entry_safe(rq, next, &requests, queuelist) {
list_del_init(&rq->queuelist);
rbd_handle_request(rbd_dev, rq);
}
}
result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, /*
rq->bio); * Called with q->queue_lock held and interrupts disabled, possibly on
if (!result) * the way to schedule(). Do not sleep here!
result = rbd_img_request_submit(img_request); */
if (result) static void rbd_request_fn(struct request_queue *q)
rbd_img_request_put(img_request); {
end_request: struct rbd_device *rbd_dev = q->queuedata;
spin_lock_irq(q->queue_lock); struct request *rq;
if (result < 0) { int queued = 0;
rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
write_request ? "write" : "read", rbd_assert(rbd_dev);
length, offset, result);
while ((rq = blk_fetch_request(q))) {
__blk_end_request_all(rq, result); /* Ignore any non-FS requests that filter through. */
if (rq->cmd_type != REQ_TYPE_FS) {
dout("%s: non-fs request type %d\n", __func__,
(int) rq->cmd_type);
__blk_end_request_all(rq, 0);
continue;
} }
list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
queued++;
} }
if (queued)
queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work);
} }
/* /*
...@@ -3847,6 +3878,8 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, ...@@ -3847,6 +3878,8 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
return NULL; return NULL;
spin_lock_init(&rbd_dev->lock); spin_lock_init(&rbd_dev->lock);
INIT_LIST_HEAD(&rbd_dev->rq_queue);
INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
rbd_dev->flags = 0; rbd_dev->flags = 0;
atomic_set(&rbd_dev->parent_ref, 0); atomic_set(&rbd_dev->parent_ref, 0);
INIT_LIST_HEAD(&rbd_dev->node); INIT_LIST_HEAD(&rbd_dev->node);
...@@ -5051,12 +5084,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) ...@@ -5051,12 +5084,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
ret = rbd_dev_mapping_set(rbd_dev); ret = rbd_dev_mapping_set(rbd_dev);
if (ret) if (ret)
goto err_out_disk; goto err_out_disk;
set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
rbd_dev->rq_wq = alloc_workqueue(rbd_dev->disk->disk_name, 0, 0);
if (!rbd_dev->rq_wq)
goto err_out_mapping;
ret = rbd_bus_add_dev(rbd_dev); ret = rbd_bus_add_dev(rbd_dev);
if (ret) if (ret)
goto err_out_mapping; goto err_out_workqueue;
/* Everything's ready. Announce the disk to the world. */ /* Everything's ready. Announce the disk to the world. */
...@@ -5068,6 +5106,9 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) ...@@ -5068,6 +5106,9 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
return ret; return ret;
err_out_workqueue:
destroy_workqueue(rbd_dev->rq_wq);
rbd_dev->rq_wq = NULL;
err_out_mapping: err_out_mapping:
rbd_dev_mapping_clear(rbd_dev); rbd_dev_mapping_clear(rbd_dev);
err_out_disk: err_out_disk:
...@@ -5314,6 +5355,7 @@ static void rbd_dev_device_release(struct device *dev) ...@@ -5314,6 +5355,7 @@ static void rbd_dev_device_release(struct device *dev)
{ {
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
destroy_workqueue(rbd_dev->rq_wq);
rbd_free_disk(rbd_dev); rbd_free_disk(rbd_dev);
clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
rbd_dev_mapping_clear(rbd_dev); rbd_dev_mapping_clear(rbd_dev);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment