Commit eed0ef2c authored by Sage Weil's avatar Sage Weil

ceph: separate banner and connect during handshake into distinct stages

We need to make sure we only swab the address during the banner once.  So
break process_banner out of process_connect, and clean up the surrounding
code so that these are distinct phases of the handshake.
Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent 685f9a5d
...@@ -564,10 +564,26 @@ static void prepare_write_keepalive(struct ceph_connection *con) ...@@ -564,10 +564,26 @@ static void prepare_write_keepalive(struct ceph_connection *con)
/* /*
* We connected to a peer and are saying hello. * We connected to a peer and are saying hello.
*/ */
static void prepare_write_connect(struct ceph_messenger *msgr, static void prepare_write_banner(struct ceph_messenger *msgr,
struct ceph_connection *con) struct ceph_connection *con)
{ {
int len = strlen(CEPH_BANNER); int len = strlen(CEPH_BANNER);
con->out_kvec[0].iov_base = CEPH_BANNER;
con->out_kvec[0].iov_len = len;
con->out_kvec[1].iov_base = &msgr->my_enc_addr;
con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr);
con->out_kvec_left = 2;
con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr);
con->out_kvec_cur = con->out_kvec;
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
}
static void prepare_write_connect(struct ceph_messenger *msgr,
struct ceph_connection *con,
int after_banner)
{
unsigned global_seq = get_global_seq(con->msgr, 0); unsigned global_seq = get_global_seq(con->msgr, 0);
int proto; int proto;
...@@ -595,32 +611,14 @@ static void prepare_write_connect(struct ceph_messenger *msgr, ...@@ -595,32 +611,14 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
if (test_bit(LOSSYTX, &con->state)) if (test_bit(LOSSYTX, &con->state))
con->out_connect.flags = CEPH_MSG_CONNECT_LOSSY; con->out_connect.flags = CEPH_MSG_CONNECT_LOSSY;
con->out_kvec[0].iov_base = CEPH_BANNER; if (!after_banner) {
con->out_kvec[0].iov_len = len; con->out_kvec_left = 0;
con->out_kvec[1].iov_base = &msgr->my_enc_addr; con->out_kvec_bytes = 0;
con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr); }
con->out_kvec[2].iov_base = &con->out_connect; con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect;
con->out_kvec[2].iov_len = sizeof(con->out_connect); con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect);
con->out_kvec_left = 3; con->out_kvec_left++;
con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr) + con->out_kvec_bytes += sizeof(con->out_connect);
sizeof(con->out_connect);
con->out_kvec_cur = con->out_kvec;
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
}
static void prepare_write_connect_retry(struct ceph_messenger *msgr,
struct ceph_connection *con)
{
dout("prepare_write_connect_retry %p\n", con);
con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
con->out_connect.global_seq =
cpu_to_le32(get_global_seq(con->msgr, 0));
con->out_kvec[0].iov_base = &con->out_connect;
con->out_kvec[0].iov_len = sizeof(con->out_connect);
con->out_kvec_left = 1;
con->out_kvec_bytes = sizeof(con->out_connect);
con->out_kvec_cur = con->out_kvec; con->out_kvec_cur = con->out_kvec;
con->out_more = 0; con->out_more = 0;
set_bit(WRITE_PENDING, &con->state); set_bit(WRITE_PENDING, &con->state);
...@@ -778,6 +776,12 @@ static int write_partial_skip(struct ceph_connection *con) ...@@ -778,6 +776,12 @@ static int write_partial_skip(struct ceph_connection *con)
/* /*
* Prepare to read connection handshake, or an ack. * Prepare to read connection handshake, or an ack.
*/ */
static void prepare_read_banner(struct ceph_connection *con)
{
dout("prepare_read_banner %p\n", con);
con->in_base_pos = 0;
}
static void prepare_read_connect(struct ceph_connection *con) static void prepare_read_connect(struct ceph_connection *con)
{ {
dout("prepare_read_connect %p\n", con); dout("prepare_read_connect %p\n", con);
...@@ -829,11 +833,11 @@ static int read_partial(struct ceph_connection *con, ...@@ -829,11 +833,11 @@ static int read_partial(struct ceph_connection *con,
/* /*
* Read all or part of the connect-side handshake on a new connection * Read all or part of the connect-side handshake on a new connection
*/ */
static int read_partial_connect(struct ceph_connection *con) static int read_partial_banner(struct ceph_connection *con)
{ {
int ret, to = 0; int ret, to = 0;
dout("read_partial_connect %p at %d\n", con, con->in_base_pos); dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
/* peer's banner */ /* peer's banner */
ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner); ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner);
...@@ -847,6 +851,16 @@ static int read_partial_connect(struct ceph_connection *con) ...@@ -847,6 +851,16 @@ static int read_partial_connect(struct ceph_connection *con)
&con->peer_addr_for_me); &con->peer_addr_for_me);
if (ret <= 0) if (ret <= 0)
goto out; goto out;
out:
return ret;
}
static int read_partial_connect(struct ceph_connection *con)
{
int ret, to = 0;
dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply); ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply);
if (ret <= 0) if (ret <= 0)
goto out; goto out;
...@@ -856,6 +870,7 @@ static int read_partial_connect(struct ceph_connection *con) ...@@ -856,6 +870,7 @@ static int read_partial_connect(struct ceph_connection *con)
le32_to_cpu(con->in_reply.global_seq)); le32_to_cpu(con->in_reply.global_seq));
out: out:
return ret; return ret;
} }
/* /*
...@@ -976,9 +991,9 @@ int ceph_parse_ips(const char *c, const char *end, ...@@ -976,9 +991,9 @@ int ceph_parse_ips(const char *c, const char *end,
return -EINVAL; return -EINVAL;
} }
static int process_connect(struct ceph_connection *con) static int process_banner(struct ceph_connection *con)
{ {
dout("process_connect on %p tag %d\n", con, (int)con->in_tag); dout("process_banner on %p\n", con);
if (verify_hello(con) < 0) if (verify_hello(con) < 0)
return -1; return -1;
...@@ -1016,10 +1031,19 @@ static int process_connect(struct ceph_connection *con) ...@@ -1016,10 +1031,19 @@ static int process_connect(struct ceph_connection *con)
sizeof(con->peer_addr_for_me.in_addr)); sizeof(con->peer_addr_for_me.in_addr));
addr_set_port(&con->msgr->inst.addr.in_addr, port); addr_set_port(&con->msgr->inst.addr.in_addr, port);
encode_my_addr(con->msgr); encode_my_addr(con->msgr);
dout("process_connect learned my addr is %s\n", dout("process_banner learned my addr is %s\n",
pr_addr(&con->msgr->inst.addr.in_addr)); pr_addr(&con->msgr->inst.addr.in_addr));
} }
set_bit(NEGOTIATING, &con->state);
prepare_read_connect(con);
return 0;
}
static int process_connect(struct ceph_connection *con)
{
dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
switch (con->in_reply.tag) { switch (con->in_reply.tag) {
case CEPH_MSGR_TAG_BADPROTOVER: case CEPH_MSGR_TAG_BADPROTOVER:
dout("process_connect got BADPROTOVER my %d != their %d\n", dout("process_connect got BADPROTOVER my %d != their %d\n",
...@@ -1053,7 +1077,7 @@ static int process_connect(struct ceph_connection *con) ...@@ -1053,7 +1077,7 @@ static int process_connect(struct ceph_connection *con)
ENTITY_NAME(con->peer_name), ENTITY_NAME(con->peer_name),
pr_addr(&con->peer_addr.in_addr)); pr_addr(&con->peer_addr.in_addr));
reset_connection(con); reset_connection(con);
prepare_write_connect_retry(con->msgr, con); prepare_write_connect(con->msgr, con, 0);
prepare_read_connect(con); prepare_read_connect(con);
/* Tell ceph about it. */ /* Tell ceph about it. */
...@@ -1071,7 +1095,7 @@ static int process_connect(struct ceph_connection *con) ...@@ -1071,7 +1095,7 @@ static int process_connect(struct ceph_connection *con)
le32_to_cpu(con->out_connect.connect_seq), le32_to_cpu(con->out_connect.connect_seq),
le32_to_cpu(con->in_connect.connect_seq)); le32_to_cpu(con->in_connect.connect_seq));
con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
prepare_write_connect_retry(con->msgr, con); prepare_write_connect(con->msgr, con, 0);
prepare_read_connect(con); prepare_read_connect(con);
break; break;
...@@ -1080,19 +1104,17 @@ static int process_connect(struct ceph_connection *con) ...@@ -1080,19 +1104,17 @@ static int process_connect(struct ceph_connection *con)
* If we sent a smaller global_seq than the peer has, try * If we sent a smaller global_seq than the peer has, try
* again with a larger value. * again with a larger value.
*/ */
dout("process_connect got RETRY_GLOBAL my %u, peer_gseq = %u\n", dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
con->peer_global_seq, con->peer_global_seq,
le32_to_cpu(con->in_connect.global_seq)); le32_to_cpu(con->in_connect.global_seq));
get_global_seq(con->msgr, get_global_seq(con->msgr,
le32_to_cpu(con->in_connect.global_seq)); le32_to_cpu(con->in_connect.global_seq));
prepare_write_connect_retry(con->msgr, con); prepare_write_connect(con->msgr, con, 0);
prepare_read_connect(con); prepare_read_connect(con);
break; break;
case CEPH_MSGR_TAG_READY: case CEPH_MSGR_TAG_READY:
clear_bit(CONNECTING, &con->state); clear_bit(CONNECTING, &con->state);
if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
set_bit(LOSSYRX, &con->state);
con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
con->connect_seq++; con->connect_seq++;
dout("process_connect got READY gseq %d cseq %d (%d)\n", dout("process_connect got READY gseq %d cseq %d (%d)\n",
...@@ -1420,9 +1442,11 @@ static int try_write(struct ceph_connection *con) ...@@ -1420,9 +1442,11 @@ static int try_write(struct ceph_connection *con)
if (test_and_clear_bit(STANDBY, &con->state)) if (test_and_clear_bit(STANDBY, &con->state))
con->connect_seq++; con->connect_seq++;
prepare_write_connect(msgr, con); prepare_write_banner(msgr, con);
prepare_read_connect(con); prepare_write_connect(msgr, con, 1);
prepare_read_banner(con);
set_bit(CONNECTING, &con->state); set_bit(CONNECTING, &con->state);
clear_bit(NEGOTIATING, &con->state);
con->in_tag = CEPH_MSGR_TAG_READY; con->in_tag = CEPH_MSGR_TAG_READY;
dout("try_write initiating connect on %p new state %lu\n", dout("try_write initiating connect on %p new state %lu\n",
...@@ -1521,7 +1545,16 @@ static int try_read(struct ceph_connection *con) ...@@ -1521,7 +1545,16 @@ static int try_read(struct ceph_connection *con)
dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
con->in_base_pos); con->in_base_pos);
if (test_bit(CONNECTING, &con->state)) { if (test_bit(CONNECTING, &con->state)) {
dout("try_read connecting\n"); if (!test_bit(NEGOTIATING, &con->state)) {
dout("try_read connecting\n");
ret = read_partial_banner(con);
if (ret <= 0)
goto done;
if (process_banner(con) < 0) {
ret = -1;
goto out;
}
}
ret = read_partial_connect(con); ret = read_partial_connect(con);
if (ret <= 0) if (ret <= 0)
goto done; goto done;
......
...@@ -104,8 +104,8 @@ struct ceph_msg_pos { ...@@ -104,8 +104,8 @@ struct ceph_msg_pos {
* thread is currently opening, reading or writing data to the socket. * thread is currently opening, reading or writing data to the socket.
*/ */
#define LOSSYTX 0 /* we can close channel or drop messages on errors */ #define LOSSYTX 0 /* we can close channel or drop messages on errors */
#define LOSSYRX 1 /* peer may reset/drop messages */ #define CONNECTING 1
#define CONNECTING 2 #define NEGOTIATING 2
#define KEEPALIVE_PENDING 3 #define KEEPALIVE_PENDING 3
#define WRITE_PENDING 4 /* we have data ready to send */ #define WRITE_PENDING 4 /* we have data ready to send */
#define QUEUED 5 /* there is work queued on this connection */ #define QUEUED 5 /* there is work queued on this connection */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment