Commit e62e26d3 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-6.5-rc4' of https://github.com/ceph/ceph-client

Pull ceph fixes from Ilya Dryomov:
 "A patch to reduce the potential for erroneous RBD exclusive lock
  blocklisting (fencing) with a couple of prerequisites and a fixup to
  prevent metrics from being sent to the MDS even just once after that
  has been disabled by the user. All marked for stable"

* tag 'ceph-for-6.5-rc4' of https://github.com/ceph/ceph-client:
  rbd: retrieve and check lock owner twice before blocklisting
  rbd: harden get_lock_owner_info() a bit
  rbd: make get_lock_owner_info() return a single locker or NULL
  ceph: never send metrics if disable_send_metrics is set
parents 28d79b74 58815900
......@@ -3849,51 +3849,82 @@ static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
}
static int get_lock_owner_info(struct rbd_device *rbd_dev,
struct ceph_locker **lockers, u32 *num_lockers)
static bool locker_equal(const struct ceph_locker *lhs,
const struct ceph_locker *rhs)
{
return lhs->id.name.type == rhs->id.name.type &&
lhs->id.name.num == rhs->id.name.num &&
!strcmp(lhs->id.cookie, rhs->id.cookie) &&
ceph_addr_equal_no_type(&lhs->info.addr, &rhs->info.addr);
}
static void free_locker(struct ceph_locker *locker)
{
if (locker)
ceph_free_lockers(locker, 1);
}
static struct ceph_locker *get_lock_owner_info(struct rbd_device *rbd_dev)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct ceph_locker *lockers;
u32 num_lockers;
u8 lock_type;
char *lock_tag;
u64 handle;
int ret;
dout("%s rbd_dev %p\n", __func__, rbd_dev);
ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, RBD_LOCK_NAME,
&lock_type, &lock_tag, lockers, num_lockers);
if (ret)
return ret;
&lock_type, &lock_tag, &lockers, &num_lockers);
if (ret) {
rbd_warn(rbd_dev, "failed to retrieve lockers: %d", ret);
return ERR_PTR(ret);
}
if (*num_lockers == 0) {
if (num_lockers == 0) {
dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
lockers = NULL;
goto out;
}
if (strcmp(lock_tag, RBD_LOCK_TAG)) {
rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
lock_tag);
ret = -EBUSY;
goto out;
goto err_busy;
}
if (lock_type == CEPH_CLS_LOCK_SHARED) {
rbd_warn(rbd_dev, "shared lock type detected");
ret = -EBUSY;
goto out;
if (lock_type != CEPH_CLS_LOCK_EXCLUSIVE) {
rbd_warn(rbd_dev, "incompatible lock type detected");
goto err_busy;
}
if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
strlen(RBD_LOCK_COOKIE_PREFIX))) {
WARN_ON(num_lockers != 1);
ret = sscanf(lockers[0].id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu",
&handle);
if (ret != 1) {
rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
(*lockers)[0].id.cookie);
ret = -EBUSY;
goto out;
lockers[0].id.cookie);
goto err_busy;
}
if (ceph_addr_is_blank(&lockers[0].info.addr)) {
rbd_warn(rbd_dev, "locker has a blank address");
goto err_busy;
}
dout("%s rbd_dev %p got locker %s%llu@%pISpc/%u handle %llu\n",
__func__, rbd_dev, ENTITY_NAME(lockers[0].id.name),
&lockers[0].info.addr.in_addr,
le32_to_cpu(lockers[0].info.addr.nonce), handle);
out:
kfree(lock_tag);
return ret;
return lockers;
err_busy:
kfree(lock_tag);
ceph_free_lockers(lockers, num_lockers);
return ERR_PTR(-EBUSY);
}
static int find_watcher(struct rbd_device *rbd_dev,
......@@ -3947,51 +3978,68 @@ static int find_watcher(struct rbd_device *rbd_dev,
static int rbd_try_lock(struct rbd_device *rbd_dev)
{
struct ceph_client *client = rbd_dev->rbd_client->client;
struct ceph_locker *lockers;
u32 num_lockers;
struct ceph_locker *locker, *refreshed_locker;
int ret;
for (;;) {
locker = refreshed_locker = NULL;
ret = rbd_lock(rbd_dev);
if (ret != -EBUSY)
return ret;
goto out;
/* determine if the current lock holder is still alive */
ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
if (ret)
return ret;
if (num_lockers == 0)
locker = get_lock_owner_info(rbd_dev);
if (IS_ERR(locker)) {
ret = PTR_ERR(locker);
locker = NULL;
goto out;
}
if (!locker)
goto again;
ret = find_watcher(rbd_dev, lockers);
ret = find_watcher(rbd_dev, locker);
if (ret)
goto out; /* request lock or error */
refreshed_locker = get_lock_owner_info(rbd_dev);
if (IS_ERR(refreshed_locker)) {
ret = PTR_ERR(refreshed_locker);
refreshed_locker = NULL;
goto out;
}
if (!refreshed_locker ||
!locker_equal(locker, refreshed_locker))
goto again;
rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
ENTITY_NAME(lockers[0].id.name));
ENTITY_NAME(locker->id.name));
ret = ceph_monc_blocklist_add(&client->monc,
&lockers[0].info.addr);
&locker->info.addr);
if (ret) {
rbd_warn(rbd_dev, "blocklist of %s%llu failed: %d",
ENTITY_NAME(lockers[0].id.name), ret);
rbd_warn(rbd_dev, "failed to blocklist %s%llu: %d",
ENTITY_NAME(locker->id.name), ret);
goto out;
}
ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, RBD_LOCK_NAME,
lockers[0].id.cookie,
&lockers[0].id.name);
if (ret && ret != -ENOENT)
locker->id.cookie, &locker->id.name);
if (ret && ret != -ENOENT) {
rbd_warn(rbd_dev, "failed to break header lock: %d",
ret);
goto out;
}
again:
ceph_free_lockers(lockers, num_lockers);
free_locker(refreshed_locker);
free_locker(locker);
}
out:
ceph_free_lockers(lockers, num_lockers);
free_locker(refreshed_locker);
free_locker(locker);
return ret;
}
......
......@@ -216,7 +216,7 @@ static void metric_delayed_work(struct work_struct *work)
struct ceph_mds_client *mdsc =
container_of(m, struct ceph_mds_client, metric);
if (mdsc->stopping)
if (mdsc->stopping || disable_send_metrics)
return;
if (!m->session || !check_session_state(m->session)) {
......
......@@ -1123,6 +1123,7 @@ bool ceph_addr_is_blank(const struct ceph_entity_addr *addr)
return true;
}
}
EXPORT_SYMBOL(ceph_addr_is_blank);
int ceph_addr_port(const struct ceph_entity_addr *addr)
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment