Commit cff7f223 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-5.17-rc3' of git://github.com/ceph/ceph-client

Pull ceph fixes from Ilya Dryomov:
 "A patch to make it possible to disable zero copy path in the messenger
  to avoid checksum or authentication tag mismatches and ensuing session
  resets in case the destination buffer isn't guaranteed to be stable"

* tag 'ceph-for-5.17-rc3' of git://github.com/ceph/ceph-client:
  libceph: optionally use bounce buffer on recv path in crc mode
  libceph: make recv path in secure mode work the same as send path
parents 1eb7de17 038b8d1d
......@@ -35,6 +35,7 @@
#define CEPH_OPT_TCP_NODELAY (1<<4) /* TCP_NODELAY on TCP sockets */
#define CEPH_OPT_NOMSGSIGN (1<<5) /* don't sign msgs (msgr1) */
#define CEPH_OPT_ABORT_ON_FULL (1<<6) /* abort w/ ENOSPC when full */
#define CEPH_OPT_RXBOUNCE (1<<7) /* double-buffer read data */
#define CEPH_OPT_DEFAULT (CEPH_OPT_TCP_NODELAY)
......
......@@ -383,6 +383,10 @@ struct ceph_connection_v2_info {
struct ceph_gcm_nonce in_gcm_nonce;
struct ceph_gcm_nonce out_gcm_nonce;
struct page **in_enc_pages;
int in_enc_page_cnt;
int in_enc_resid;
int in_enc_i;
struct page **out_enc_pages;
int out_enc_page_cnt;
int out_enc_resid;
......@@ -457,6 +461,7 @@ struct ceph_connection {
struct ceph_msg *out_msg; /* sending message (== tail of
out_sent) */
struct page *bounce_page;
u32 in_front_crc, in_middle_crc, in_data_crc; /* calculated crc */
struct timespec64 last_keepalive_ack; /* keepalive2 ack stamp */
......
......@@ -246,6 +246,7 @@ enum {
Opt_cephx_sign_messages,
Opt_tcp_nodelay,
Opt_abort_on_full,
Opt_rxbounce,
};
enum {
......@@ -295,6 +296,7 @@ static const struct fs_parameter_spec ceph_parameters[] = {
fsparam_u32 ("osdkeepalive", Opt_osdkeepalivetimeout),
fsparam_enum ("read_from_replica", Opt_read_from_replica,
ceph_param_read_from_replica),
fsparam_flag ("rxbounce", Opt_rxbounce),
fsparam_enum ("ms_mode", Opt_ms_mode,
ceph_param_ms_mode),
fsparam_string ("secret", Opt_secret),
......@@ -584,6 +586,9 @@ int ceph_parse_param(struct fs_parameter *param, struct ceph_options *opt,
case Opt_abort_on_full:
opt->flags |= CEPH_OPT_ABORT_ON_FULL;
break;
case Opt_rxbounce:
opt->flags |= CEPH_OPT_RXBOUNCE;
break;
default:
BUG();
......@@ -660,6 +665,8 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client,
seq_puts(m, "notcp_nodelay,");
if (show_all && (opt->flags & CEPH_OPT_ABORT_ON_FULL))
seq_puts(m, "abort_on_full,");
if (opt->flags & CEPH_OPT_RXBOUNCE)
seq_puts(m, "rxbounce,");
if (opt->mount_timeout != CEPH_MOUNT_TIMEOUT_DEFAULT)
seq_printf(m, "mount_timeout=%d,",
......
......@@ -515,6 +515,10 @@ static void ceph_con_reset_protocol(struct ceph_connection *con)
ceph_msg_put(con->out_msg);
con->out_msg = NULL;
}
if (con->bounce_page) {
__free_page(con->bounce_page);
con->bounce_page = NULL;
}
if (ceph_msgr2(from_msgr(con->msgr)))
ceph_con_v2_reset_protocol(con);
......
......@@ -992,8 +992,7 @@ static int read_partial_message_section(struct ceph_connection *con,
static int read_partial_msg_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->in_msg;
struct ceph_msg_data_cursor *cursor = &msg->cursor;
struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
struct page *page;
size_t page_offset;
......@@ -1001,9 +1000,6 @@ static int read_partial_msg_data(struct ceph_connection *con)
u32 crc = 0;
int ret;
if (!msg->num_data_items)
return -EIO;
if (do_datacrc)
crc = con->in_data_crc;
while (cursor->total_resid) {
......@@ -1031,6 +1027,46 @@ static int read_partial_msg_data(struct ceph_connection *con)
return 1; /* must return > 0 to indicate success */
}
static int read_partial_msg_data_bounce(struct ceph_connection *con)
{
struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
struct page *page;
size_t off, len;
u32 crc;
int ret;
if (unlikely(!con->bounce_page)) {
con->bounce_page = alloc_page(GFP_NOIO);
if (!con->bounce_page) {
pr_err("failed to allocate bounce page\n");
return -ENOMEM;
}
}
crc = con->in_data_crc;
while (cursor->total_resid) {
if (!cursor->resid) {
ceph_msg_data_advance(cursor, 0);
continue;
}
page = ceph_msg_data_next(cursor, &off, &len, NULL);
ret = ceph_tcp_recvpage(con->sock, con->bounce_page, 0, len);
if (ret <= 0) {
con->in_data_crc = crc;
return ret;
}
crc = crc32c(crc, page_address(con->bounce_page), ret);
memcpy_to_page(page, off, page_address(con->bounce_page), ret);
ceph_msg_data_advance(cursor, ret);
}
con->in_data_crc = crc;
return 1; /* must return > 0 to indicate success */
}
/*
* read (part of) a message.
*/
......@@ -1141,7 +1177,13 @@ static int read_partial_message(struct ceph_connection *con)
/* (page) data */
if (data_len) {
ret = read_partial_msg_data(con);
if (!m->num_data_items)
return -EIO;
if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
ret = read_partial_msg_data_bounce(con);
else
ret = read_partial_msg_data(con);
if (ret <= 0)
return ret;
}
......
......@@ -57,8 +57,9 @@
#define IN_S_HANDLE_CONTROL_REMAINDER 3
#define IN_S_PREPARE_READ_DATA 4
#define IN_S_PREPARE_READ_DATA_CONT 5
#define IN_S_HANDLE_EPILOGUE 6
#define IN_S_FINISH_SKIP 7
#define IN_S_PREPARE_READ_ENC_PAGE 6
#define IN_S_HANDLE_EPILOGUE 7
#define IN_S_FINISH_SKIP 8
#define OUT_S_QUEUE_DATA 1
#define OUT_S_QUEUE_DATA_CONT 2
......@@ -1032,22 +1033,41 @@ static int decrypt_control_remainder(struct ceph_connection *con)
padded_len(rem_len) + CEPH_GCM_TAG_LEN);
}
static int decrypt_message(struct ceph_connection *con)
static int decrypt_tail(struct ceph_connection *con)
{
struct sg_table enc_sgt = {};
struct sg_table sgt = {};
int tail_len;
int ret;
tail_len = tail_onwire_len(con->in_msg, true);
ret = sg_alloc_table_from_pages(&enc_sgt, con->v2.in_enc_pages,
con->v2.in_enc_page_cnt, 0, tail_len,
GFP_NOIO);
if (ret)
goto out;
ret = setup_message_sgs(&sgt, con->in_msg, FRONT_PAD(con->v2.in_buf),
MIDDLE_PAD(con->v2.in_buf), DATA_PAD(con->v2.in_buf),
con->v2.in_buf, true);
if (ret)
goto out;
ret = gcm_crypt(con, false, sgt.sgl, sgt.sgl,
tail_onwire_len(con->in_msg, true));
dout("%s con %p msg %p enc_page_cnt %d sg_cnt %d\n", __func__, con,
con->in_msg, con->v2.in_enc_page_cnt, sgt.orig_nents);
ret = gcm_crypt(con, false, enc_sgt.sgl, sgt.sgl, tail_len);
if (ret)
goto out;
WARN_ON(!con->v2.in_enc_page_cnt);
ceph_release_page_vector(con->v2.in_enc_pages,
con->v2.in_enc_page_cnt);
con->v2.in_enc_pages = NULL;
con->v2.in_enc_page_cnt = 0;
out:
sg_free_table(&sgt);
sg_free_table(&enc_sgt);
return ret;
}
......@@ -1733,54 +1753,157 @@ static int prepare_read_control_remainder(struct ceph_connection *con)
return 0;
}
static void prepare_read_data(struct ceph_connection *con)
static int prepare_read_data(struct ceph_connection *con)
{
struct bio_vec bv;
if (!con_secure(con))
con->in_data_crc = -1;
con->in_data_crc = -1;
ceph_msg_data_cursor_init(&con->v2.in_cursor, con->in_msg,
data_len(con->in_msg));
get_bvec_at(&con->v2.in_cursor, &bv);
set_in_bvec(con, &bv);
if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
if (unlikely(!con->bounce_page)) {
con->bounce_page = alloc_page(GFP_NOIO);
if (!con->bounce_page) {
pr_err("failed to allocate bounce page\n");
return -ENOMEM;
}
}
bv.bv_page = con->bounce_page;
bv.bv_offset = 0;
set_in_bvec(con, &bv);
} else {
set_in_bvec(con, &bv);
}
con->v2.in_state = IN_S_PREPARE_READ_DATA_CONT;
return 0;
}
static void prepare_read_data_cont(struct ceph_connection *con)
{
struct bio_vec bv;
if (!con_secure(con))
if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
con->in_data_crc = crc32c(con->in_data_crc,
page_address(con->bounce_page),
con->v2.in_bvec.bv_len);
get_bvec_at(&con->v2.in_cursor, &bv);
memcpy_to_page(bv.bv_page, bv.bv_offset,
page_address(con->bounce_page),
con->v2.in_bvec.bv_len);
} else {
con->in_data_crc = ceph_crc32c_page(con->in_data_crc,
con->v2.in_bvec.bv_page,
con->v2.in_bvec.bv_offset,
con->v2.in_bvec.bv_len);
}
ceph_msg_data_advance(&con->v2.in_cursor, con->v2.in_bvec.bv_len);
if (con->v2.in_cursor.total_resid) {
get_bvec_at(&con->v2.in_cursor, &bv);
set_in_bvec(con, &bv);
if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
bv.bv_page = con->bounce_page;
bv.bv_offset = 0;
set_in_bvec(con, &bv);
} else {
set_in_bvec(con, &bv);
}
WARN_ON(con->v2.in_state != IN_S_PREPARE_READ_DATA_CONT);
return;
}
/*
* We've read all data. Prepare to read data padding (if any)
* and epilogue.
* We've read all data. Prepare to read epilogue.
*/
reset_in_kvecs(con);
if (con_secure(con)) {
if (need_padding(data_len(con->in_msg)))
add_in_kvec(con, DATA_PAD(con->v2.in_buf),
padding_len(data_len(con->in_msg)));
add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_SECURE_LEN);
add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN);
con->v2.in_state = IN_S_HANDLE_EPILOGUE;
}
static int prepare_read_tail_plain(struct ceph_connection *con)
{
struct ceph_msg *msg = con->in_msg;
if (!front_len(msg) && !middle_len(msg)) {
WARN_ON(!data_len(msg));
return prepare_read_data(con);
}
reset_in_kvecs(con);
if (front_len(msg)) {
add_in_kvec(con, msg->front.iov_base, front_len(msg));
WARN_ON(msg->front.iov_len != front_len(msg));
}
if (middle_len(msg)) {
add_in_kvec(con, msg->middle->vec.iov_base, middle_len(msg));
WARN_ON(msg->middle->vec.iov_len != middle_len(msg));
}
if (data_len(msg)) {
con->v2.in_state = IN_S_PREPARE_READ_DATA;
} else {
add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN);
con->v2.in_state = IN_S_HANDLE_EPILOGUE;
}
return 0;
}
static void prepare_read_enc_page(struct ceph_connection *con)
{
struct bio_vec bv;
dout("%s con %p i %d resid %d\n", __func__, con, con->v2.in_enc_i,
con->v2.in_enc_resid);
WARN_ON(!con->v2.in_enc_resid);
bv.bv_page = con->v2.in_enc_pages[con->v2.in_enc_i];
bv.bv_offset = 0;
bv.bv_len = min(con->v2.in_enc_resid, (int)PAGE_SIZE);
set_in_bvec(con, &bv);
con->v2.in_enc_i++;
con->v2.in_enc_resid -= bv.bv_len;
if (con->v2.in_enc_resid) {
con->v2.in_state = IN_S_PREPARE_READ_ENC_PAGE;
return;
}
/*
* We are set to read the last piece of ciphertext (ending
* with epilogue) + auth tag.
*/
WARN_ON(con->v2.in_enc_i != con->v2.in_enc_page_cnt);
con->v2.in_state = IN_S_HANDLE_EPILOGUE;
}
static int prepare_read_tail_secure(struct ceph_connection *con)
{
struct page **enc_pages;
int enc_page_cnt;
int tail_len;
tail_len = tail_onwire_len(con->in_msg, true);
WARN_ON(!tail_len);
enc_page_cnt = calc_pages_for(0, tail_len);
enc_pages = ceph_alloc_page_vector(enc_page_cnt, GFP_NOIO);
if (IS_ERR(enc_pages))
return PTR_ERR(enc_pages);
WARN_ON(con->v2.in_enc_pages || con->v2.in_enc_page_cnt);
con->v2.in_enc_pages = enc_pages;
con->v2.in_enc_page_cnt = enc_page_cnt;
con->v2.in_enc_resid = tail_len;
con->v2.in_enc_i = 0;
prepare_read_enc_page(con);
return 0;
}
static void __finish_skip(struct ceph_connection *con)
{
con->in_seq++;
......@@ -2589,47 +2712,26 @@ static int __handle_control(struct ceph_connection *con, void *p)
}
msg = con->in_msg; /* set in process_message_header() */
if (!front_len(msg) && !middle_len(msg)) {
if (!data_len(msg))
return process_message(con);
prepare_read_data(con);
return 0;
}
reset_in_kvecs(con);
if (front_len(msg)) {
WARN_ON(front_len(msg) > msg->front_alloc_len);
add_in_kvec(con, msg->front.iov_base, front_len(msg));
msg->front.iov_len = front_len(msg);
if (con_secure(con) && need_padding(front_len(msg)))
add_in_kvec(con, FRONT_PAD(con->v2.in_buf),
padding_len(front_len(msg)));
} else {
msg->front.iov_len = 0;
}
if (middle_len(msg)) {
WARN_ON(middle_len(msg) > msg->middle->alloc_len);
add_in_kvec(con, msg->middle->vec.iov_base, middle_len(msg));
msg->middle->vec.iov_len = middle_len(msg);
if (con_secure(con) && need_padding(middle_len(msg)))
add_in_kvec(con, MIDDLE_PAD(con->v2.in_buf),
padding_len(middle_len(msg)));
} else if (msg->middle) {
msg->middle->vec.iov_len = 0;
}
if (data_len(msg)) {
con->v2.in_state = IN_S_PREPARE_READ_DATA;
} else {
add_in_kvec(con, con->v2.in_buf,
con_secure(con) ? CEPH_EPILOGUE_SECURE_LEN :
CEPH_EPILOGUE_PLAIN_LEN);
con->v2.in_state = IN_S_HANDLE_EPILOGUE;
}
return 0;
if (!front_len(msg) && !middle_len(msg) && !data_len(msg))
return process_message(con);
if (con_secure(con))
return prepare_read_tail_secure(con);
return prepare_read_tail_plain(con);
}
static int handle_preamble(struct ceph_connection *con)
......@@ -2717,7 +2819,7 @@ static int handle_epilogue(struct ceph_connection *con)
int ret;
if (con_secure(con)) {
ret = decrypt_message(con);
ret = decrypt_tail(con);
if (ret) {
if (ret == -EBADMSG)
con->error_msg = "integrity error, bad epilogue auth tag";
......@@ -2785,13 +2887,16 @@ static int populate_in_iter(struct ceph_connection *con)
ret = handle_control_remainder(con);
break;
case IN_S_PREPARE_READ_DATA:
prepare_read_data(con);
ret = 0;
ret = prepare_read_data(con);
break;
case IN_S_PREPARE_READ_DATA_CONT:
prepare_read_data_cont(con);
ret = 0;
break;
case IN_S_PREPARE_READ_ENC_PAGE:
prepare_read_enc_page(con);
ret = 0;
break;
case IN_S_HANDLE_EPILOGUE:
ret = handle_epilogue(con);
break;
......@@ -3326,20 +3431,16 @@ void ceph_con_v2_revoke(struct ceph_connection *con)
static void revoke_at_prepare_read_data(struct ceph_connection *con)
{
int remaining; /* data + [data padding] + epilogue */
int remaining;
int resid;
WARN_ON(con_secure(con));
WARN_ON(!data_len(con->in_msg));
WARN_ON(!iov_iter_is_kvec(&con->v2.in_iter));
resid = iov_iter_count(&con->v2.in_iter);
WARN_ON(!resid);
if (con_secure(con))
remaining = padded_len(data_len(con->in_msg)) +
CEPH_EPILOGUE_SECURE_LEN;
else
remaining = data_len(con->in_msg) + CEPH_EPILOGUE_PLAIN_LEN;
remaining = data_len(con->in_msg) + CEPH_EPILOGUE_PLAIN_LEN;
dout("%s con %p resid %d remaining %d\n", __func__, con, resid,
remaining);
con->v2.in_iter.count -= resid;
......@@ -3350,8 +3451,9 @@ static void revoke_at_prepare_read_data(struct ceph_connection *con)
static void revoke_at_prepare_read_data_cont(struct ceph_connection *con)
{
int recved, resid; /* current piece of data */
int remaining; /* [data padding] + epilogue */
int remaining;
WARN_ON(con_secure(con));
WARN_ON(!data_len(con->in_msg));
WARN_ON(!iov_iter_is_bvec(&con->v2.in_iter));
resid = iov_iter_count(&con->v2.in_iter);
......@@ -3363,12 +3465,7 @@ static void revoke_at_prepare_read_data_cont(struct ceph_connection *con)
ceph_msg_data_advance(&con->v2.in_cursor, recved);
WARN_ON(resid > con->v2.in_cursor.total_resid);
if (con_secure(con))
remaining = padding_len(data_len(con->in_msg)) +
CEPH_EPILOGUE_SECURE_LEN;
else
remaining = CEPH_EPILOGUE_PLAIN_LEN;
remaining = CEPH_EPILOGUE_PLAIN_LEN;
dout("%s con %p total_resid %zu remaining %d\n", __func__, con,
con->v2.in_cursor.total_resid, remaining);
con->v2.in_iter.count -= resid;
......@@ -3376,11 +3473,26 @@ static void revoke_at_prepare_read_data_cont(struct ceph_connection *con)
con->v2.in_state = IN_S_FINISH_SKIP;
}
static void revoke_at_prepare_read_enc_page(struct ceph_connection *con)
{
int resid; /* current enc page (not necessarily data) */
WARN_ON(!con_secure(con));
WARN_ON(!iov_iter_is_bvec(&con->v2.in_iter));
resid = iov_iter_count(&con->v2.in_iter);
WARN_ON(!resid || resid > con->v2.in_bvec.bv_len);
dout("%s con %p resid %d enc_resid %d\n", __func__, con, resid,
con->v2.in_enc_resid);
con->v2.in_iter.count -= resid;
set_in_skip(con, resid + con->v2.in_enc_resid);
con->v2.in_state = IN_S_FINISH_SKIP;
}
static void revoke_at_handle_epilogue(struct ceph_connection *con)
{
int resid;
WARN_ON(!iov_iter_is_kvec(&con->v2.in_iter));
resid = iov_iter_count(&con->v2.in_iter);
WARN_ON(!resid);
......@@ -3399,6 +3511,9 @@ void ceph_con_v2_revoke_incoming(struct ceph_connection *con)
case IN_S_PREPARE_READ_DATA_CONT:
revoke_at_prepare_read_data_cont(con);
break;
case IN_S_PREPARE_READ_ENC_PAGE:
revoke_at_prepare_read_enc_page(con);
break;
case IN_S_HANDLE_EPILOGUE:
revoke_at_handle_epilogue(con);
break;
......@@ -3432,6 +3547,13 @@ void ceph_con_v2_reset_protocol(struct ceph_connection *con)
clear_out_sign_kvecs(con);
free_conn_bufs(con);
if (con->v2.in_enc_pages) {
WARN_ON(!con->v2.in_enc_page_cnt);
ceph_release_page_vector(con->v2.in_enc_pages,
con->v2.in_enc_page_cnt);
con->v2.in_enc_pages = NULL;
con->v2.in_enc_page_cnt = 0;
}
if (con->v2.out_enc_pages) {
WARN_ON(!con->v2.out_enc_page_cnt);
ceph_release_page_vector(con->v2.out_enc_pages,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment