Commit 4cd34e7c authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Simplify dealing with aborted partially transmitted messages

If the previous message was only partially transmitted, we need to close
the socket in order to avoid corruption of the message stream. To do so,
we currently hijack the unlocking of the socket in order to schedule
the close.
Now that we track the message offset in the socket state, we can move
that kind of checking out of the socket lock code, which is needed to
allow messages to remain queued after dropping the socket lock.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent 6c7a64e5
...@@ -491,6 +491,16 @@ static int xs_nospace(struct rpc_task *task) ...@@ -491,6 +491,16 @@ static int xs_nospace(struct rpc_task *task)
return ret; return ret;
} }
/*
* Determine if the previous message in the stream was aborted before it
* could complete transmission.
*/
static bool
xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req)
{
return transport->xmit.offset != 0 && req->rq_bytes_sent == 0;
}
/* /*
* Construct a stream transport record marker in @buf. * Construct a stream transport record marker in @buf.
*/ */
...@@ -522,6 +532,12 @@ static int xs_local_send_request(struct rpc_task *task) ...@@ -522,6 +532,12 @@ static int xs_local_send_request(struct rpc_task *task)
int status; int status;
int sent = 0; int sent = 0;
/* Close the stream if the previous transmission was incomplete */
if (xs_send_request_was_aborted(transport, req)) {
xs_close(xprt);
return -ENOTCONN;
}
xs_encode_stream_record_marker(&req->rq_snd_buf); xs_encode_stream_record_marker(&req->rq_snd_buf);
xs_pktdump("packet data:", xs_pktdump("packet data:",
...@@ -665,6 +681,13 @@ static int xs_tcp_send_request(struct rpc_task *task) ...@@ -665,6 +681,13 @@ static int xs_tcp_send_request(struct rpc_task *task)
int status; int status;
int sent; int sent;
/* Close the stream if the previous transmission was incomplete */
if (xs_send_request_was_aborted(transport, req)) {
if (transport->sock != NULL)
kernel_sock_shutdown(transport->sock, SHUT_RDWR);
return -ENOTCONN;
}
xs_encode_stream_record_marker(&req->rq_snd_buf); xs_encode_stream_record_marker(&req->rq_snd_buf);
xs_pktdump("packet data:", xs_pktdump("packet data:",
...@@ -755,30 +778,6 @@ static int xs_tcp_send_request(struct rpc_task *task) ...@@ -755,30 +778,6 @@ static int xs_tcp_send_request(struct rpc_task *task)
return status; return status;
} }
/**
* xs_tcp_release_xprt - clean up after a tcp transmission
* @xprt: transport
* @task: rpc task
*
* This cleans up if an error causes us to abort the transmission of a request.
* In this case, the socket may need to be reset in order to avoid confusing
* the server.
*/
static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
if (task != xprt->snd_task)
return;
if (task == NULL)
goto out_release;
if (transport->xmit.offset == 0 || !xprt_connected(xprt))
goto out_release;
set_bit(XPRT_CLOSE_WAIT, &xprt->state);
out_release:
xprt_release_xprt(xprt, task);
}
static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk) static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
{ {
transport->old_data_ready = sk->sk_data_ready; transport->old_data_ready = sk->sk_data_ready;
...@@ -2764,7 +2763,7 @@ static void bc_destroy(struct rpc_xprt *xprt) ...@@ -2764,7 +2763,7 @@ static void bc_destroy(struct rpc_xprt *xprt)
static const struct rpc_xprt_ops xs_local_ops = { static const struct rpc_xprt_ops xs_local_ops = {
.reserve_xprt = xprt_reserve_xprt, .reserve_xprt = xprt_reserve_xprt,
.release_xprt = xs_tcp_release_xprt, .release_xprt = xprt_release_xprt,
.alloc_slot = xprt_alloc_slot, .alloc_slot = xprt_alloc_slot,
.free_slot = xprt_free_slot, .free_slot = xprt_free_slot,
.rpcbind = xs_local_rpcbind, .rpcbind = xs_local_rpcbind,
...@@ -2806,7 +2805,7 @@ static const struct rpc_xprt_ops xs_udp_ops = { ...@@ -2806,7 +2805,7 @@ static const struct rpc_xprt_ops xs_udp_ops = {
static const struct rpc_xprt_ops xs_tcp_ops = { static const struct rpc_xprt_ops xs_tcp_ops = {
.reserve_xprt = xprt_reserve_xprt, .reserve_xprt = xprt_reserve_xprt,
.release_xprt = xs_tcp_release_xprt, .release_xprt = xprt_release_xprt,
.alloc_slot = xprt_lock_and_alloc_slot, .alloc_slot = xprt_lock_and_alloc_slot,
.free_slot = xprt_free_slot, .free_slot = xprt_free_slot,
.rpcbind = rpcb_getport_async, .rpcbind = rpcb_getport_async,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment