Commit 571f3dd0 authored by David S. Miller's avatar David S. Miller

Merge tag 'rxrpc-fixes-20230107' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs

David Howells says:

====================
rxrpc: Fix race between call connection, data transmit and call disconnect

Here are patches to fix an oops[1] caused by a race between call
connection, initial packet transmission and call disconnection which
results in something like:

        kernel BUG at net/rxrpc/peer_object.c:413!

when the syzbot test is run.  The problem is that the connection procedure
is effectively split across two threads and can get expanded by taking an
interrupt, thereby adding the call to the peer error distribution list
*after* it has been disconnected (say by the rxrpc socket shutting down).

The easiest solution is to look at the fourth set of I/O thread
conversion/SACK table expansion patches that didn't get applied[2] and take
from it those patches that move call connection and disconnection into the
I/O thread.  Moving these things into the I/O thread means that the
sequencing is managed by all being done in the same thread - and the race
can no longer happen.

This is preferable to introducing an extra lock as adding an extra lock
would make the I/O thread have to wait for the app thread in yet another
place.

The changes can be considered as a number of logical parts:

 (1) Move all of the call state changes into the I/O thread.

 (2) Make client connection ID space per-local endpoint so that the I/O
     thread doesn't need locks to access it.

 (3) Move actual abort generation into the I/O thread and clean it up.  If
     sendmsg or recvmsg want to cause an abort, they have to delegate it.

 (4) Offload the setting up of the security context on a connection to the
     thread of one of the apps that's starting a call.  We don't want to be
     doing any sort of crypto in the I/O thread.

 (5) Connect calls (ie. assign them to channel slots on connections) in the
     I/O thread.  Calls are set up by sendmsg/kafs and passed to the I/O
     thread to connect.  Connections are allocated in the I/O thread after
     this.

 (6) Disconnect calls in the I/O thread.

I've also added a patch for an unrelated bug that cropped up during
testing, whereby a race can occur between an incoming call and socket
shutdown.

Note that whilst this fixes the original syzbot bug, another bug may get
triggered if this one is fixed:

        INFO: rcu detected stall in corrupted
        rcu: INFO: rcu_preempt detected expedited stalls on CPUs/tasks: { P5792 } 2657 jiffies s: 2825 root: 0x0/T
        rcu: blocking rcu_node structures (internal RCU debug):

It doesn't look this should be anything to do with rxrpc, though, as I've
tested an additional patch[3] that removes practically all the RCU usage
from rxrpc and it still occurs.  It seems likely that it is being caused by
something in the tunnelling setup that the syzbot test does, but there's
not enough info to go on.  It also seems unlikely to be anything to do with
the afs driver as the test doesn't use that.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents b4e9b876 42f229c3
......@@ -880,8 +880,8 @@ The kernel interface functions are as follows:
notify_end_rx can be NULL or it can be used to specify a function to be
called when the call changes state to end the Tx phase. This function is
called with the call-state spinlock held to prevent any reply or final ACK
from being delivered first.
called with a spinlock held to prevent the last DATA packet from being
transmitted until the function returns.
(#) Receive data from a call::
......
......@@ -13,6 +13,8 @@
#include "internal.h"
#include "afs_cm.h"
#include "protocol_yfs.h"
#define RXRPC_TRACE_ONLY_DEFINE_ENUMS
#include <trace/events/rxrpc.h>
static int afs_deliver_cb_init_call_back_state(struct afs_call *);
static int afs_deliver_cb_init_call_back_state3(struct afs_call *);
......@@ -191,7 +193,7 @@ static void afs_cm_destructor(struct afs_call *call)
* Abort a service call from within an action function.
*/
static void afs_abort_service_call(struct afs_call *call, u32 abort_code, int error,
const char *why)
enum rxrpc_abort_reason why)
{
rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
abort_code, error, why);
......@@ -469,7 +471,7 @@ static void SRXAFSCB_ProbeUuid(struct work_struct *work)
if (memcmp(r, &call->net->uuid, sizeof(call->net->uuid)) == 0)
afs_send_empty_reply(call);
else
afs_abort_service_call(call, 1, 1, "K-1");
afs_abort_service_call(call, 1, 1, afs_abort_probeuuid_negative);
afs_put_call(call);
_leave("");
......
......@@ -13,6 +13,8 @@
#include "internal.h"
#include "afs_cm.h"
#include "protocol_yfs.h"
#define RXRPC_TRACE_ONLY_DEFINE_ENUMS
#include <trace/events/rxrpc.h>
struct workqueue_struct *afs_async_calls;
......@@ -397,7 +399,8 @@ void afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call, gfp_t gfp)
error_do_abort:
if (ret != -ECONNABORTED) {
rxrpc_kernel_abort_call(call->net->socket, rxcall,
RX_USER_ABORT, ret, "KSD");
RX_USER_ABORT, ret,
afs_abort_send_data_error);
} else {
len = 0;
iov_iter_kvec(&msg.msg_iter, ITER_DEST, NULL, 0, 0);
......@@ -527,7 +530,8 @@ static void afs_deliver_to_call(struct afs_call *call)
case -ENOTSUPP:
abort_code = RXGEN_OPCODE;
rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
abort_code, ret, "KIV");
abort_code, ret,
afs_abort_op_not_supported);
goto local_abort;
case -EIO:
pr_err("kAFS: Call %u in bad state %u\n",
......@@ -542,12 +546,14 @@ static void afs_deliver_to_call(struct afs_call *call)
if (state != AFS_CALL_CL_AWAIT_REPLY)
abort_code = RXGEN_SS_UNMARSHAL;
rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
abort_code, ret, "KUM");
abort_code, ret,
afs_abort_unmarshal_error);
goto local_abort;
default:
abort_code = RX_CALL_DEAD;
rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
abort_code, ret, "KER");
abort_code, ret,
afs_abort_general_error);
goto local_abort;
}
}
......@@ -619,7 +625,8 @@ long afs_wait_for_call_to_complete(struct afs_call *call,
/* Kill off the call if it's still live. */
_debug("call interrupted");
if (rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
RX_USER_ABORT, -EINTR, "KWI"))
RX_USER_ABORT, -EINTR,
afs_abort_interrupted))
afs_set_call_complete(call, -EINTR, 0);
}
}
......@@ -836,7 +843,8 @@ void afs_send_empty_reply(struct afs_call *call)
case -ENOMEM:
_debug("oom");
rxrpc_kernel_abort_call(net->socket, call->rxcall,
RXGEN_SS_MARSHAL, -ENOMEM, "KOO");
RXGEN_SS_MARSHAL, -ENOMEM,
afs_abort_oom);
fallthrough;
default:
_leave(" [error]");
......@@ -878,7 +886,8 @@ void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
if (n == -ENOMEM) {
_debug("oom");
rxrpc_kernel_abort_call(net->socket, call->rxcall,
RXGEN_SS_MARSHAL, -ENOMEM, "KOO");
RXGEN_SS_MARSHAL, -ENOMEM,
afs_abort_oom);
}
_leave(" [error]");
}
......@@ -900,6 +909,7 @@ int afs_extract_data(struct afs_call *call, bool want_more)
ret = rxrpc_kernel_recv_data(net->socket, call->rxcall, iter,
&call->iov_len, want_more, &remote_abort,
&call->service_id);
trace_afs_receive_data(call, call->iter, want_more, ret);
if (ret == 0 || ret == -EAGAIN)
return ret;
......
......@@ -15,6 +15,7 @@ struct key;
struct sock;
struct socket;
struct rxrpc_call;
enum rxrpc_abort_reason;
enum rxrpc_interruptibility {
RXRPC_INTERRUPTIBLE, /* Call is interruptible */
......@@ -55,7 +56,7 @@ int rxrpc_kernel_send_data(struct socket *, struct rxrpc_call *,
int rxrpc_kernel_recv_data(struct socket *, struct rxrpc_call *,
struct iov_iter *, size_t *, bool, u32 *, u16 *);
bool rxrpc_kernel_abort_call(struct socket *, struct rxrpc_call *,
u32, int, const char *);
u32, int, enum rxrpc_abort_reason);
void rxrpc_kernel_end_call(struct socket *, struct rxrpc_call *);
void rxrpc_kernel_get_peer(struct socket *, struct rxrpc_call *,
struct sockaddr_rxrpc *);
......
This diff is collapsed.
......@@ -10,6 +10,7 @@ rxrpc-y := \
call_accept.o \
call_event.o \
call_object.o \
call_state.o \
conn_client.o \
conn_event.o \
conn_object.o \
......
......@@ -155,10 +155,10 @@ static int rxrpc_bind(struct socket *sock, struct sockaddr *saddr, int len)
if (service_id) {
write_lock(&local->services_lock);
if (rcu_access_pointer(local->service))
if (local->service)
goto service_in_use;
rx->local = local;
rcu_assign_pointer(local->service, rx);
local->service = rx;
write_unlock(&local->services_lock);
rx->sk.sk_state = RXRPC_SERVER_BOUND;
......@@ -328,7 +328,6 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
mutex_unlock(&call->user_mutex);
}
rxrpc_put_peer(cp.peer, rxrpc_peer_put_discard_tmp);
_leave(" = %p", call);
return call;
}
......@@ -374,13 +373,17 @@ EXPORT_SYMBOL(rxrpc_kernel_end_call);
* @sock: The socket the call is on
* @call: The call to check
*
* Allow a kernel service to find out whether a call is still alive -
* ie. whether it has completed.
* Allow a kernel service to find out whether a call is still alive - whether
* it has completed successfully and all received data has been consumed.
*/
bool rxrpc_kernel_check_life(const struct socket *sock,
const struct rxrpc_call *call)
{
return call->state != RXRPC_CALL_COMPLETE;
if (!rxrpc_call_is_complete(call))
return true;
if (call->completion != RXRPC_CALL_SUCCEEDED)
return false;
return !skb_queue_empty(&call->recvmsg_queue);
}
EXPORT_SYMBOL(rxrpc_kernel_check_life);
......@@ -872,9 +875,9 @@ static int rxrpc_release_sock(struct sock *sk)
sk->sk_state = RXRPC_CLOSE;
if (rx->local && rcu_access_pointer(rx->local->service) == rx) {
if (rx->local && rx->local->service == rx) {
write_lock(&rx->local->services_lock);
rcu_assign_pointer(rx->local->service, NULL);
rx->local->service = NULL;
write_unlock(&rx->local->services_lock);
}
......@@ -957,16 +960,9 @@ static const struct net_proto_family rxrpc_family_ops = {
static int __init af_rxrpc_init(void)
{
int ret = -1;
unsigned int tmp;
BUILD_BUG_ON(sizeof(struct rxrpc_skb_priv) > sizeof_field(struct sk_buff, cb));
get_random_bytes(&tmp, sizeof(tmp));
tmp &= 0x3fffffff;
if (tmp == 0)
tmp = 1;
idr_set_cursor(&rxrpc_client_conn_ids, tmp);
ret = -ENOMEM;
rxrpc_call_jar = kmem_cache_create(
"rxrpc_call_jar", sizeof(struct rxrpc_call), 0,
......@@ -1062,7 +1058,6 @@ static void __exit af_rxrpc_exit(void)
* are released.
*/
rcu_barrier();
rxrpc_destroy_client_conn_ids();
destroy_workqueue(rxrpc_workqueue);
rxrpc_exit_security();
......
This diff is collapsed.
......@@ -99,7 +99,7 @@ static int rxrpc_service_prealloc_one(struct rxrpc_sock *rx,
if (!call)
return -ENOMEM;
call->flags |= (1 << RXRPC_CALL_IS_SERVICE);
call->state = RXRPC_CALL_SERVER_PREALLOC;
rxrpc_set_call_state(call, RXRPC_CALL_SERVER_PREALLOC);
__set_bit(RXRPC_CALL_EV_INITIAL_PING, &call->events);
trace_rxrpc_call(call->debug_id, refcount_read(&call->ref),
......@@ -280,7 +280,7 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
(peer_tail + 1) &
(RXRPC_BACKLOG_MAX - 1));
rxrpc_new_incoming_peer(rx, local, peer);
rxrpc_new_incoming_peer(local, peer);
}
/* Now allocate and set up the connection */
......@@ -326,11 +326,11 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
* If we want to report an error, we mark the skb with the packet type and
* abort code and return false.
*/
int rxrpc_new_incoming_call(struct rxrpc_local *local,
struct rxrpc_peer *peer,
struct rxrpc_connection *conn,
struct sockaddr_rxrpc *peer_srx,
struct sk_buff *skb)
bool rxrpc_new_incoming_call(struct rxrpc_local *local,
struct rxrpc_peer *peer,
struct rxrpc_connection *conn,
struct sockaddr_rxrpc *peer_srx,
struct sk_buff *skb)
{
const struct rxrpc_security *sec = NULL;
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
......@@ -339,18 +339,17 @@ int rxrpc_new_incoming_call(struct rxrpc_local *local,
_enter("");
/* Don't set up a call for anything other than the first DATA packet. */
if (sp->hdr.seq != 1 ||
sp->hdr.type != RXRPC_PACKET_TYPE_DATA)
return 0; /* Just discard */
/* Don't set up a call for anything other than a DATA packet. */
if (sp->hdr.type != RXRPC_PACKET_TYPE_DATA)
return rxrpc_protocol_error(skb, rxrpc_eproto_no_service_call);
rcu_read_lock();
read_lock(&local->services_lock);
/* Weed out packets to services we're not offering. Packets that would
* begin a call are explicitly rejected and the rest are just
* discarded.
*/
rx = rcu_dereference(local->service);
rx = local->service;
if (!rx || (sp->hdr.serviceId != rx->srx.srx_service &&
sp->hdr.serviceId != rx->second_service)
) {
......@@ -363,16 +362,14 @@ int rxrpc_new_incoming_call(struct rxrpc_local *local,
if (!conn) {
sec = rxrpc_get_incoming_security(rx, skb);
if (!sec)
goto reject;
goto unsupported_security;
}
spin_lock(&rx->incoming_lock);
if (rx->sk.sk_state == RXRPC_SERVER_LISTEN_DISABLED ||
rx->sk.sk_state == RXRPC_CLOSE) {
trace_rxrpc_abort(0, "CLS", sp->hdr.cid, sp->hdr.callNumber,
sp->hdr.seq, RX_INVALID_OPERATION, ESHUTDOWN);
skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
skb->priority = RX_INVALID_OPERATION;
rxrpc_direct_abort(skb, rxrpc_abort_shut_down,
RX_INVALID_OPERATION, -ESHUTDOWN);
goto no_call;
}
......@@ -402,7 +399,7 @@ int rxrpc_new_incoming_call(struct rxrpc_local *local,
spin_unlock(&conn->state_lock);
spin_unlock(&rx->incoming_lock);
rcu_read_unlock();
read_unlock(&local->services_lock);
if (hlist_unhashed(&call->error_link)) {
spin_lock(&call->peer->lock);
......@@ -413,22 +410,24 @@ int rxrpc_new_incoming_call(struct rxrpc_local *local,
_leave(" = %p{%d}", call, call->debug_id);
rxrpc_input_call_event(call, skb);
rxrpc_put_call(call, rxrpc_call_put_input);
return 0;
return true;
unsupported_service:
trace_rxrpc_abort(0, "INV", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
RX_INVALID_OPERATION, EOPNOTSUPP);
skb->priority = RX_INVALID_OPERATION;
goto reject;
read_unlock(&local->services_lock);
return rxrpc_direct_abort(skb, rxrpc_abort_service_not_offered,
RX_INVALID_OPERATION, -EOPNOTSUPP);
unsupported_security:
read_unlock(&local->services_lock);
return rxrpc_direct_abort(skb, rxrpc_abort_service_not_offered,
RX_INVALID_OPERATION, -EKEYREJECTED);
no_call:
spin_unlock(&rx->incoming_lock);
reject:
rcu_read_unlock();
read_unlock(&local->services_lock);
_leave(" = f [%u]", skb->mark);
return -EPROTO;
return false;
discard:
rcu_read_unlock();
return 0;
read_unlock(&local->services_lock);
return true;
}
/*
......
......@@ -251,6 +251,41 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
_leave("");
}
/*
* Start transmitting the reply to a service. This cancels the need to ACK the
* request if we haven't yet done so.
*/
static void rxrpc_begin_service_reply(struct rxrpc_call *call)
{
unsigned long now = jiffies;
rxrpc_set_call_state(call, RXRPC_CALL_SERVER_SEND_REPLY);
WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET);
if (call->ackr_reason == RXRPC_ACK_DELAY)
call->ackr_reason = 0;
trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now);
}
/*
* Close the transmission phase. After this point there is no more data to be
* transmitted in the call.
*/
static void rxrpc_close_tx_phase(struct rxrpc_call *call)
{
_debug("________awaiting reply/ACK__________");
switch (__rxrpc_call_state(call)) {
case RXRPC_CALL_CLIENT_SEND_REQUEST:
rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_AWAIT_REPLY);
break;
case RXRPC_CALL_SERVER_SEND_REPLY:
rxrpc_set_call_state(call, RXRPC_CALL_SERVER_AWAIT_ACK);
break;
default:
break;
}
}
static bool rxrpc_tx_window_has_space(struct rxrpc_call *call)
{
unsigned int winsize = min_t(unsigned int, call->tx_winsize,
......@@ -270,9 +305,11 @@ static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
{
struct rxrpc_txbuf *txb;
if (rxrpc_is_client_call(call) &&
!test_bit(RXRPC_CALL_EXPOSED, &call->flags))
if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
if (list_empty(&call->tx_sendmsg))
return;
rxrpc_expose_client_call(call);
}
while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
struct rxrpc_txbuf, call_link))) {
......@@ -283,6 +320,9 @@ static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
call->tx_top = txb->seq;
list_add_tail(&txb->call_link, &call->tx_buffer);
if (txb->wire.flags & RXRPC_LAST_PACKET)
rxrpc_close_tx_phase(call);
rxrpc_transmit_one(call, txb);
if (!rxrpc_tx_window_has_space(call))
......@@ -292,16 +332,15 @@ static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
static void rxrpc_transmit_some_data(struct rxrpc_call *call)
{
switch (call->state) {
switch (__rxrpc_call_state(call)) {
case RXRPC_CALL_SERVER_ACK_REQUEST:
if (list_empty(&call->tx_sendmsg))
return;
rxrpc_begin_service_reply(call);
fallthrough;
case RXRPC_CALL_SERVER_SEND_REPLY:
case RXRPC_CALL_SERVER_AWAIT_ACK:
case RXRPC_CALL_CLIENT_SEND_REQUEST:
case RXRPC_CALL_CLIENT_AWAIT_REPLY:
if (!rxrpc_tx_window_has_space(call))
return;
if (list_empty(&call->tx_sendmsg)) {
......@@ -331,21 +370,31 @@ static void rxrpc_send_initial_ping(struct rxrpc_call *call)
/*
* Handle retransmission and deferred ACK/abort generation.
*/
void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
{
unsigned long now, next, t;
rxrpc_serial_t ackr_serial;
bool resend = false, expired = false;
s32 abort_code;
rxrpc_see_call(call, rxrpc_call_see_input);
//printk("\n--------------------\n");
_enter("{%d,%s,%lx}",
call->debug_id, rxrpc_call_states[call->state], call->events);
call->debug_id, rxrpc_call_states[__rxrpc_call_state(call)],
call->events);
if (call->state == RXRPC_CALL_COMPLETE)
if (__rxrpc_call_is_complete(call))
goto out;
/* Handle abort request locklessly, vs rxrpc_propose_abort(). */
abort_code = smp_load_acquire(&call->send_abort);
if (abort_code) {
rxrpc_abort_call(call, 0, call->send_abort, call->send_abort_err,
call->send_abort_why);
goto out;
}
if (skb && skb->mark == RXRPC_SKB_MARK_ERROR)
goto out;
......@@ -358,7 +407,7 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
}
t = READ_ONCE(call->expect_req_by);
if (call->state == RXRPC_CALL_SERVER_RECV_REQUEST &&
if (__rxrpc_call_state(call) == RXRPC_CALL_SERVER_RECV_REQUEST &&
time_after_eq(now, t)) {
trace_rxrpc_timer(call, rxrpc_timer_exp_idle, now);
expired = true;
......@@ -429,11 +478,12 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
if (test_bit(RXRPC_CALL_RX_HEARD, &call->flags) &&
(int)call->conn->hi_serial - (int)call->rx_serial > 0) {
trace_rxrpc_call_reset(call);
rxrpc_abort_call("EXP", call, 0, RX_CALL_DEAD, -ECONNRESET);
rxrpc_abort_call(call, 0, RX_CALL_DEAD, -ECONNRESET,
rxrpc_abort_call_reset);
} else {
rxrpc_abort_call("EXP", call, 0, RX_CALL_TIMEOUT, -ETIME);
rxrpc_abort_call(call, 0, RX_CALL_TIMEOUT, -ETIME,
rxrpc_abort_call_timeout);
}
rxrpc_send_abort_packet(call);
goto out;
}
......@@ -441,7 +491,7 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
rxrpc_propose_ack_ping_for_lost_ack);
if (resend && call->state != RXRPC_CALL_CLIENT_RECV_REPLY)
if (resend && __rxrpc_call_state(call) != RXRPC_CALL_CLIENT_RECV_REPLY)
rxrpc_resend(call, NULL);
if (test_and_clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
......@@ -453,7 +503,7 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
rxrpc_propose_ack_input_data);
/* Make sure the timer is restarted */
if (call->state != RXRPC_CALL_COMPLETE) {
if (!__rxrpc_call_is_complete(call)) {
next = call->expect_rx_by;
#define set(T) { t = READ_ONCE(T); if (time_before(t, next)) next = t; }
......@@ -474,9 +524,15 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
}
out:
if (call->state == RXRPC_CALL_COMPLETE)
if (__rxrpc_call_is_complete(call)) {
del_timer_sync(&call->timer);
if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
rxrpc_disconnect_call(call);
if (call->security)
call->security->free_call_crypto(call);
}
if (call->acks_hard_ack != call->tx_bottom)
rxrpc_shrink_call_tx_buffer(call);
_leave("");
return true;
}
This diff is collapsed.
// SPDX-License-Identifier: GPL-2.0-or-later
/* Call state changing functions.
*
* Copyright (C) 2022 Red Hat, Inc. All Rights Reserved.
* Written by David Howells (dhowells@redhat.com)
*/
#include "ar-internal.h"
/*
* Transition a call to the complete state.
*/
bool rxrpc_set_call_completion(struct rxrpc_call *call,
enum rxrpc_call_completion compl,
u32 abort_code,
int error)
{
if (__rxrpc_call_state(call) == RXRPC_CALL_COMPLETE)
return false;
call->abort_code = abort_code;
call->error = error;
call->completion = compl;
/* Allow reader of completion state to operate locklessly */
rxrpc_set_call_state(call, RXRPC_CALL_COMPLETE);
trace_rxrpc_call_complete(call);
wake_up(&call->waitq);
rxrpc_notify_socket(call);
return true;
}
/*
* Record that a call successfully completed.
*/
bool rxrpc_call_completed(struct rxrpc_call *call)
{
return rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0);
}
/*
* Record that a call is locally aborted.
*/
bool rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq,
u32 abort_code, int error, enum rxrpc_abort_reason why)
{
trace_rxrpc_abort(call->debug_id, why, call->cid, call->call_id, seq,
abort_code, error);
if (!rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED,
abort_code, error))
return false;
if (test_bit(RXRPC_CALL_EXPOSED, &call->flags))
rxrpc_send_abort_packet(call);
return true;
}
/*
* Record that a call errored out before even getting off the ground, thereby
* setting the state to allow it to be destroyed.
*/
void rxrpc_prefail_call(struct rxrpc_call *call, enum rxrpc_call_completion compl,
int error)
{
call->abort_code = RX_CALL_DEAD;
call->error = error;
call->completion = compl;
call->_state = RXRPC_CALL_COMPLETE;
trace_rxrpc_call_complete(call);
WARN_ON_ONCE(__test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags));
}
This diff is collapsed.
This diff is collapsed.
......@@ -23,12 +23,30 @@ static void rxrpc_clean_up_connection(struct work_struct *work);
static void rxrpc_set_service_reap_timer(struct rxrpc_net *rxnet,
unsigned long reap_at);
void rxrpc_poke_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why)
{
struct rxrpc_local *local = conn->local;
bool busy;
if (WARN_ON_ONCE(!local))
return;
spin_lock_bh(&local->lock);
busy = !list_empty(&conn->attend_link);
if (!busy) {
rxrpc_get_connection(conn, why);
list_add_tail(&conn->attend_link, &local->conn_attend_q);
}
spin_unlock_bh(&local->lock);
rxrpc_wake_up_io_thread(local);
}
static void rxrpc_connection_timer(struct timer_list *timer)
{
struct rxrpc_connection *conn =
container_of(timer, struct rxrpc_connection, timer);
rxrpc_queue_conn(conn, rxrpc_conn_queue_timer);
rxrpc_poke_conn(conn, rxrpc_conn_get_poke_timer);
}
/*
......@@ -49,6 +67,7 @@ struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *rxnet,
INIT_WORK(&conn->destructor, rxrpc_clean_up_connection);
INIT_LIST_HEAD(&conn->proc_link);
INIT_LIST_HEAD(&conn->link);
mutex_init(&conn->security_lock);
skb_queue_head_init(&conn->rx_queue);
conn->rxnet = rxnet;
conn->security = &rxrpc_no_security;
......@@ -82,10 +101,10 @@ struct rxrpc_connection *rxrpc_find_client_connection_rcu(struct rxrpc_local *lo
_enter(",%x", sp->hdr.cid & RXRPC_CIDMASK);
/* Look up client connections by connection ID alone as their IDs are
* unique for this machine.
/* Look up client connections by connection ID alone as their
* IDs are unique for this machine.
*/
conn = idr_find(&rxrpc_client_conn_ids, sp->hdr.cid >> RXRPC_CIDSHIFT);
conn = idr_find(&local->conn_ids, sp->hdr.cid >> RXRPC_CIDSHIFT);
if (!conn || refcount_read(&conn->ref) == 0) {
_debug("no conn");
goto not_found;
......@@ -139,7 +158,7 @@ void __rxrpc_disconnect_call(struct rxrpc_connection *conn,
_enter("%d,%x", conn->debug_id, call->cid);
if (rcu_access_pointer(chan->call) == call) {
if (chan->call == call) {
/* Save the result of the call so that we can repeat it if necessary
* through the channel, whilst disposing of the actual call record.
*/
......@@ -159,12 +178,9 @@ void __rxrpc_disconnect_call(struct rxrpc_connection *conn,
break;
}
/* Sync with rxrpc_conn_retransmit(). */
smp_wmb();
chan->last_call = chan->call_id;
chan->call_id = chan->call_counter;
rcu_assign_pointer(chan->call, NULL);
chan->call = NULL;
}
_leave("");
......@@ -178,6 +194,9 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
{
struct rxrpc_connection *conn = call->conn;
set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
rxrpc_see_call(call, rxrpc_call_see_disconnected);
call->peer->cong_ssthresh = call->cong_ssthresh;
if (!hlist_unhashed(&call->error_link)) {
......@@ -186,18 +205,17 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
spin_unlock(&call->peer->lock);
}
if (rxrpc_is_client_call(call))
return rxrpc_disconnect_client_call(conn->bundle, call);
spin_lock(&conn->bundle->channel_lock);
__rxrpc_disconnect_call(conn, call);
spin_unlock(&conn->bundle->channel_lock);
if (rxrpc_is_client_call(call)) {
rxrpc_disconnect_client_call(call->bundle, call);
} else {
__rxrpc_disconnect_call(conn, call);
conn->idle_timestamp = jiffies;
if (atomic_dec_and_test(&conn->active))
rxrpc_set_service_reap_timer(conn->rxnet,
jiffies + rxrpc_connection_expiry);
}
set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
conn->idle_timestamp = jiffies;
if (atomic_dec_and_test(&conn->active))
rxrpc_set_service_reap_timer(conn->rxnet,
jiffies + rxrpc_connection_expiry);
rxrpc_put_call(call, rxrpc_call_put_io_thread);
}
/*
......@@ -293,10 +311,10 @@ static void rxrpc_clean_up_connection(struct work_struct *work)
container_of(work, struct rxrpc_connection, destructor);
struct rxrpc_net *rxnet = conn->rxnet;
ASSERT(!rcu_access_pointer(conn->channels[0].call) &&
!rcu_access_pointer(conn->channels[1].call) &&
!rcu_access_pointer(conn->channels[2].call) &&
!rcu_access_pointer(conn->channels[3].call));
ASSERT(!conn->channels[0].call &&
!conn->channels[1].call &&
!conn->channels[2].call &&
!conn->channels[3].call);
ASSERT(list_empty(&conn->cache_link));
del_timer_sync(&conn->timer);
......@@ -447,7 +465,6 @@ void rxrpc_destroy_all_connections(struct rxrpc_net *rxnet)
_enter("");
atomic_dec(&rxnet->nr_conns);
rxrpc_destroy_all_client_connections(rxnet);
del_timer_sync(&rxnet->service_conn_reap_timer);
rxrpc_queue_work(&rxnet->service_conn_reaper);
......
......@@ -11,7 +11,6 @@
static struct rxrpc_bundle rxrpc_service_dummy_bundle = {
.ref = REFCOUNT_INIT(1),
.debug_id = UINT_MAX,
.channel_lock = __SPIN_LOCK_UNLOCKED(&rxrpc_service_dummy_bundle.channel_lock),
};
/*
......
This diff is collapsed.
......@@ -43,25 +43,17 @@ static void none_free_call_crypto(struct rxrpc_call *call)
}
static int none_respond_to_challenge(struct rxrpc_connection *conn,
struct sk_buff *skb,
u32 *_abort_code)
struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
trace_rxrpc_rx_eproto(NULL, sp->hdr.serial,
tracepoint_string("chall_none"));
return -EPROTO;
return rxrpc_abort_conn(conn, skb, RX_PROTOCOL_ERROR, -EPROTO,
rxrpc_eproto_rxnull_challenge);
}
static int none_verify_response(struct rxrpc_connection *conn,
struct sk_buff *skb,
u32 *_abort_code)
struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
trace_rxrpc_rx_eproto(NULL, sp->hdr.serial,
tracepoint_string("resp_none"));
return -EPROTO;
return rxrpc_abort_conn(conn, skb, RX_PROTOCOL_ERROR, -EPROTO,
rxrpc_eproto_rxnull_response);
}
static void none_clear(struct rxrpc_connection *conn)
......
This diff is collapsed.
......@@ -82,31 +82,59 @@ static long rxrpc_local_cmp_key(const struct rxrpc_local *local,
}
}
static void rxrpc_client_conn_reap_timeout(struct timer_list *timer)
{
struct rxrpc_local *local =
container_of(timer, struct rxrpc_local, client_conn_reap_timer);
if (local->kill_all_client_conns &&
test_and_set_bit(RXRPC_CLIENT_CONN_REAP_TIMER, &local->client_conn_flags))
rxrpc_wake_up_io_thread(local);
}
/*
* Allocate a new local endpoint.
*/
static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
static struct rxrpc_local *rxrpc_alloc_local(struct net *net,
const struct sockaddr_rxrpc *srx)
{
struct rxrpc_local *local;
u32 tmp;
local = kzalloc(sizeof(struct rxrpc_local), GFP_KERNEL);
if (local) {
refcount_set(&local->ref, 1);
atomic_set(&local->active_users, 1);
local->rxnet = rxnet;
local->net = net;
local->rxnet = rxrpc_net(net);
INIT_HLIST_NODE(&local->link);
init_rwsem(&local->defrag_sem);
init_completion(&local->io_thread_ready);
skb_queue_head_init(&local->rx_queue);
INIT_LIST_HEAD(&local->conn_attend_q);
INIT_LIST_HEAD(&local->call_attend_q);
local->client_bundles = RB_ROOT;
spin_lock_init(&local->client_bundles_lock);
local->kill_all_client_conns = false;
INIT_LIST_HEAD(&local->idle_client_conns);
timer_setup(&local->client_conn_reap_timer,
rxrpc_client_conn_reap_timeout, 0);
spin_lock_init(&local->lock);
rwlock_init(&local->services_lock);
local->debug_id = atomic_inc_return(&rxrpc_debug_id);
memcpy(&local->srx, srx, sizeof(*srx));
local->srx.srx_service = 0;
idr_init(&local->conn_ids);
get_random_bytes(&tmp, sizeof(tmp));
tmp &= 0x3fffffff;
if (tmp == 0)
tmp = 1;
idr_set_cursor(&local->conn_ids, tmp);
INIT_LIST_HEAD(&local->new_client_calls);
spin_lock_init(&local->client_call_lock);
trace_rxrpc_local(local->debug_id, rxrpc_local_new, 1, 1);
}
......@@ -248,7 +276,7 @@ struct rxrpc_local *rxrpc_lookup_local(struct net *net,
goto found;
}
local = rxrpc_alloc_local(rxnet, srx);
local = rxrpc_alloc_local(net, srx);
if (!local)
goto nomem;
......@@ -407,6 +435,7 @@ void rxrpc_destroy_local(struct rxrpc_local *local)
* local endpoint.
*/
rxrpc_purge_queue(&local->rx_queue);
rxrpc_purge_client_connections(local);
}
/*
......
......@@ -10,15 +10,6 @@
unsigned int rxrpc_net_id;
static void rxrpc_client_conn_reap_timeout(struct timer_list *timer)
{
struct rxrpc_net *rxnet =
container_of(timer, struct rxrpc_net, client_conn_reap_timer);
if (rxnet->live)
rxrpc_queue_work(&rxnet->client_conn_reaper);
}
static void rxrpc_service_conn_reap_timeout(struct timer_list *timer)
{
struct rxrpc_net *rxnet =
......@@ -63,14 +54,6 @@ static __net_init int rxrpc_init_net(struct net *net)
rxrpc_service_conn_reap_timeout, 0);
atomic_set(&rxnet->nr_client_conns, 0);
rxnet->kill_all_client_conns = false;
spin_lock_init(&rxnet->client_conn_cache_lock);
mutex_init(&rxnet->client_conn_discard_lock);
INIT_LIST_HEAD(&rxnet->idle_client_conns);
INIT_WORK(&rxnet->client_conn_reaper,
rxrpc_discard_expired_client_conns);
timer_setup(&rxnet->client_conn_reap_timer,
rxrpc_client_conn_reap_timeout, 0);
INIT_HLIST_HEAD(&rxnet->local_endpoints);
mutex_init(&rxnet->local_mutex);
......
......@@ -261,7 +261,7 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
rxrpc_tx_point_call_ack);
rxrpc_tx_backoff(call, ret);
if (call->state < RXRPC_CALL_COMPLETE) {
if (!__rxrpc_call_is_complete(call)) {
if (ret < 0)
rxrpc_cancel_rtt_probe(call, serial, rtt_slot);
rxrpc_set_keepalive(call);
......@@ -544,6 +544,62 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
goto done;
}
/*
* Transmit a connection-level abort.
*/
void rxrpc_send_conn_abort(struct rxrpc_connection *conn)
{
struct rxrpc_wire_header whdr;
struct msghdr msg;
struct kvec iov[2];
__be32 word;
size_t len;
u32 serial;
int ret;
msg.msg_name = &conn->peer->srx.transport;
msg.msg_namelen = conn->peer->srx.transport_len;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
whdr.epoch = htonl(conn->proto.epoch);
whdr.cid = htonl(conn->proto.cid);
whdr.callNumber = 0;
whdr.seq = 0;
whdr.type = RXRPC_PACKET_TYPE_ABORT;
whdr.flags = conn->out_clientflag;
whdr.userStatus = 0;
whdr.securityIndex = conn->security_ix;
whdr._rsvd = 0;
whdr.serviceId = htons(conn->service_id);
word = htonl(conn->abort_code);
iov[0].iov_base = &whdr;
iov[0].iov_len = sizeof(whdr);
iov[1].iov_base = &word;
iov[1].iov_len = sizeof(word);
len = iov[0].iov_len + iov[1].iov_len;
serial = atomic_inc_return(&conn->serial);
whdr.serial = htonl(serial);
iov_iter_kvec(&msg.msg_iter, WRITE, iov, 2, len);
ret = do_udp_sendmsg(conn->local->socket, &msg, len);
if (ret < 0) {
trace_rxrpc_tx_fail(conn->debug_id, serial, ret,
rxrpc_tx_point_conn_abort);
_debug("sendmsg failed: %d", ret);
return;
}
trace_rxrpc_tx_packet(conn->debug_id, &whdr, rxrpc_tx_point_conn_abort);
conn->peer->last_tx_at = ktime_get_seconds();
}
/*
* Reject a packet through the local endpoint.
*/
......@@ -667,7 +723,7 @@ void rxrpc_send_keepalive(struct rxrpc_peer *peer)
static inline void rxrpc_instant_resend(struct rxrpc_call *call,
struct rxrpc_txbuf *txb)
{
if (call->state < RXRPC_CALL_COMPLETE)
if (!__rxrpc_call_is_complete(call))
kdebug("resend");
}
......
......@@ -147,10 +147,10 @@ struct rxrpc_peer *rxrpc_lookup_peer_rcu(struct rxrpc_local *local,
* assess the MTU size for the network interface through which this peer is
* reached
*/
static void rxrpc_assess_MTU_size(struct rxrpc_sock *rx,
static void rxrpc_assess_MTU_size(struct rxrpc_local *local,
struct rxrpc_peer *peer)
{
struct net *net = sock_net(&rx->sk);
struct net *net = local->net;
struct dst_entry *dst;
struct rtable *rt;
struct flowi fl;
......@@ -236,11 +236,11 @@ struct rxrpc_peer *rxrpc_alloc_peer(struct rxrpc_local *local, gfp_t gfp,
/*
* Initialise peer record.
*/
static void rxrpc_init_peer(struct rxrpc_sock *rx, struct rxrpc_peer *peer,
static void rxrpc_init_peer(struct rxrpc_local *local, struct rxrpc_peer *peer,
unsigned long hash_key)
{
peer->hash_key = hash_key;
rxrpc_assess_MTU_size(rx, peer);
rxrpc_assess_MTU_size(local, peer);
peer->mtu = peer->if_mtu;
peer->rtt_last_req = ktime_get_real();
......@@ -272,8 +272,7 @@ static void rxrpc_init_peer(struct rxrpc_sock *rx, struct rxrpc_peer *peer,
/*
* Set up a new peer.
*/
static struct rxrpc_peer *rxrpc_create_peer(struct rxrpc_sock *rx,
struct rxrpc_local *local,
static struct rxrpc_peer *rxrpc_create_peer(struct rxrpc_local *local,
struct sockaddr_rxrpc *srx,
unsigned long hash_key,
gfp_t gfp)
......@@ -285,7 +284,7 @@ static struct rxrpc_peer *rxrpc_create_peer(struct rxrpc_sock *rx,
peer = rxrpc_alloc_peer(local, gfp, rxrpc_peer_new_client);
if (peer) {
memcpy(&peer->srx, srx, sizeof(*srx));
rxrpc_init_peer(rx, peer, hash_key);
rxrpc_init_peer(local, peer, hash_key);
}
_leave(" = %p", peer);
......@@ -304,14 +303,13 @@ static void rxrpc_free_peer(struct rxrpc_peer *peer)
* since we've already done a search in the list from the non-reentrant context
* (the data_ready handler) that is the only place we can add new peers.
*/
void rxrpc_new_incoming_peer(struct rxrpc_sock *rx, struct rxrpc_local *local,
struct rxrpc_peer *peer)
void rxrpc_new_incoming_peer(struct rxrpc_local *local, struct rxrpc_peer *peer)
{
struct rxrpc_net *rxnet = local->rxnet;
unsigned long hash_key;
hash_key = rxrpc_peer_hash_key(local, &peer->srx);
rxrpc_init_peer(rx, peer, hash_key);
rxrpc_init_peer(local, peer, hash_key);
spin_lock(&rxnet->peer_hash_lock);
hash_add_rcu(rxnet->peer_hash, &peer->hash_link, hash_key);
......@@ -322,8 +320,7 @@ void rxrpc_new_incoming_peer(struct rxrpc_sock *rx, struct rxrpc_local *local,
/*
* obtain a remote transport endpoint for the specified address
*/
struct rxrpc_peer *rxrpc_lookup_peer(struct rxrpc_sock *rx,
struct rxrpc_local *local,
struct rxrpc_peer *rxrpc_lookup_peer(struct rxrpc_local *local,
struct sockaddr_rxrpc *srx, gfp_t gfp)
{
struct rxrpc_peer *peer, *candidate;
......@@ -343,7 +340,7 @@ struct rxrpc_peer *rxrpc_lookup_peer(struct rxrpc_sock *rx,
/* The peer is not yet present in hash - create a candidate
* for a new record and then redo the search.
*/
candidate = rxrpc_create_peer(rx, local, srx, hash_key, gfp);
candidate = rxrpc_create_peer(local, srx, hash_key, gfp);
if (!candidate) {
_leave(" = NULL [nomem]");
return NULL;
......
......@@ -12,13 +12,13 @@
static const char *const rxrpc_conn_states[RXRPC_CONN__NR_STATES] = {
[RXRPC_CONN_UNUSED] = "Unused ",
[RXRPC_CONN_CLIENT_UNSECURED] = "ClUnsec ",
[RXRPC_CONN_CLIENT] = "Client ",
[RXRPC_CONN_SERVICE_PREALLOC] = "SvPrealc",
[RXRPC_CONN_SERVICE_UNSECURED] = "SvUnsec ",
[RXRPC_CONN_SERVICE_CHALLENGING] = "SvChall ",
[RXRPC_CONN_SERVICE] = "SvSecure",
[RXRPC_CONN_REMOTELY_ABORTED] = "RmtAbort",
[RXRPC_CONN_LOCALLY_ABORTED] = "LocAbort",
[RXRPC_CONN_ABORTED] = "Aborted ",
};
/*
......@@ -51,6 +51,7 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
struct rxrpc_local *local;
struct rxrpc_call *call;
struct rxrpc_net *rxnet = rxrpc_net(seq_file_net(seq));
enum rxrpc_call_state state;
unsigned long timeout = 0;
rxrpc_seq_t acks_hard_ack;
char lbuff[50], rbuff[50];
......@@ -75,7 +76,8 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
sprintf(rbuff, "%pISpc", &call->dest_srx.transport);
if (call->state != RXRPC_CALL_SERVER_PREALLOC) {
state = rxrpc_call_state(call);
if (state != RXRPC_CALL_SERVER_PREALLOC) {
timeout = READ_ONCE(call->expect_rx_by);
timeout -= jiffies;
}
......@@ -92,7 +94,7 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
call->call_id,
rxrpc_is_service_call(call) ? "Svc" : "Clt",
refcount_read(&call->ref),
rxrpc_call_states[call->state],
rxrpc_call_states[state],
call->abort_code,
call->debug_id,
acks_hard_ack, READ_ONCE(call->tx_top) - acks_hard_ack,
......@@ -143,6 +145,7 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
{
struct rxrpc_connection *conn;
struct rxrpc_net *rxnet = rxrpc_net(seq_file_net(seq));
const char *state;
char lbuff[50], rbuff[50];
if (v == &rxnet->conn_proc_list) {
......@@ -163,9 +166,11 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
}
sprintf(lbuff, "%pISpc", &conn->local->srx.transport);
sprintf(rbuff, "%pISpc", &conn->peer->srx.transport);
print:
state = rxrpc_is_conn_aborted(conn) ?
rxrpc_call_completions[conn->completion] :
rxrpc_conn_states[conn->state];
seq_printf(seq,
"UDP %-47.47s %-47.47s %4x %08x %s %3u %3d"
" %s %08x %08x %08x %08x %08x %08x %08x\n",
......@@ -176,7 +181,7 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
rxrpc_conn_is_service(conn) ? "Svc" : "Clt",
refcount_read(&conn->ref),
atomic_read(&conn->active),
rxrpc_conn_states[conn->state],
state,
key_serial(conn->key),
atomic_read(&conn->serial),
conn->hi_serial,
......
......@@ -58,85 +58,6 @@ void rxrpc_notify_socket(struct rxrpc_call *call)
_leave("");
}
/*
* Transition a call to the complete state.
*/
bool __rxrpc_set_call_completion(struct rxrpc_call *call,
enum rxrpc_call_completion compl,
u32 abort_code,
int error)
{
if (call->state < RXRPC_CALL_COMPLETE) {
call->abort_code = abort_code;
call->error = error;
call->completion = compl;
call->state = RXRPC_CALL_COMPLETE;
trace_rxrpc_call_complete(call);
wake_up(&call->waitq);
rxrpc_notify_socket(call);
return true;
}
return false;
}
bool rxrpc_set_call_completion(struct rxrpc_call *call,
enum rxrpc_call_completion compl,
u32 abort_code,
int error)
{
bool ret = false;
if (call->state < RXRPC_CALL_COMPLETE) {
write_lock(&call->state_lock);
ret = __rxrpc_set_call_completion(call, compl, abort_code, error);
write_unlock(&call->state_lock);
}
return ret;
}
/*
* Record that a call successfully completed.
*/
bool __rxrpc_call_completed(struct rxrpc_call *call)
{
return __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0);
}
bool rxrpc_call_completed(struct rxrpc_call *call)
{
bool ret = false;
if (call->state < RXRPC_CALL_COMPLETE) {
write_lock(&call->state_lock);
ret = __rxrpc_call_completed(call);
write_unlock(&call->state_lock);
}
return ret;
}
/*
* Record that a call is locally aborted.
*/
bool __rxrpc_abort_call(const char *why, struct rxrpc_call *call,
rxrpc_seq_t seq, u32 abort_code, int error)
{
trace_rxrpc_abort(call->debug_id, why, call->cid, call->call_id, seq,
abort_code, error);
return __rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED,
abort_code, error);
}
bool rxrpc_abort_call(const char *why, struct rxrpc_call *call,
rxrpc_seq_t seq, u32 abort_code, int error)
{
bool ret;
write_lock(&call->state_lock);
ret = __rxrpc_abort_call(why, call, seq, abort_code, error);
write_unlock(&call->state_lock);
return ret;
}
/*
* Pass a call terminating message to userspace.
*/
......@@ -168,7 +89,7 @@ static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
break;
default:
pr_err("Invalid terminal call state %u\n", call->state);
pr_err("Invalid terminal call state %u\n", call->completion);
BUG();
break;
}
......@@ -179,41 +100,6 @@ static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
return ret;
}
/*
* End the packet reception phase.
*/
static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
{
rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq);
_enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY)
rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
write_lock(&call->state_lock);
switch (call->state) {
case RXRPC_CALL_CLIENT_RECV_REPLY:
__rxrpc_call_completed(call);
write_unlock(&call->state_lock);
break;
case RXRPC_CALL_SERVER_RECV_REQUEST:
call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
write_unlock(&call->state_lock);
rxrpc_propose_delay_ACK(call, serial,
rxrpc_propose_ack_processing_op);
break;
default:
write_unlock(&call->state_lock);
break;
}
}
/*
* Discard a packet we've used up and advance the Rx window by one.
*/
......@@ -244,10 +130,9 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
serial, call->rx_consumed);
if (last) {
rxrpc_end_rx_phase(call, serial);
return;
}
if (last)
set_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags);
/* Check to see if there's an ACK that needs sending. */
acked = atomic_add_return(call->rx_consumed - old_consumed,
......@@ -272,7 +157,8 @@ static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb)
/*
* Deliver messages to a call. This keeps processing packets until the buffer
* is filled and we find either more DATA (returns 0) or the end of the DATA
* (returns 1). If more packets are required, it returns -EAGAIN.
* (returns 1). If more packets are required, it returns -EAGAIN and if the
* call has failed it returns -EIO.
*/
static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
struct msghdr *msg, struct iov_iter *iter,
......@@ -288,7 +174,13 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
rx_pkt_offset = call->rx_pkt_offset;
rx_pkt_len = call->rx_pkt_len;
if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
if (rxrpc_call_has_failed(call)) {
seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
ret = -EIO;
goto done;
}
if (test_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags)) {
seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
ret = 1;
goto done;
......@@ -312,14 +204,15 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
if (rx_pkt_offset == 0) {
ret2 = rxrpc_verify_data(call, skb);
rx_pkt_offset = sp->offset;
rx_pkt_len = sp->len;
trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq,
rx_pkt_offset, rx_pkt_len, ret2);
sp->offset, sp->len, ret2);
if (ret2 < 0) {
kdebug("verify = %d", ret2);
ret = ret2;
goto out;
}
rx_pkt_offset = sp->offset;
rx_pkt_len = sp->len;
} else {
trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
rx_pkt_offset, rx_pkt_len, 0);
......@@ -494,36 +387,36 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
msg->msg_namelen = len;
}
switch (READ_ONCE(call->state)) {
case RXRPC_CALL_CLIENT_RECV_REPLY:
case RXRPC_CALL_SERVER_RECV_REQUEST:
case RXRPC_CALL_SERVER_ACK_REQUEST:
ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
flags, &copied);
if (ret == -EAGAIN)
ret = 0;
if (!skb_queue_empty(&call->recvmsg_queue))
rxrpc_notify_socket(call);
break;
default:
ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
flags, &copied);
if (ret == -EAGAIN)
ret = 0;
break;
}
if (ret == -EIO)
goto call_failed;
if (ret < 0)
goto error_unlock_call;
if (call->state == RXRPC_CALL_COMPLETE) {
ret = rxrpc_recvmsg_term(call, msg);
if (ret < 0)
goto error_unlock_call;
if (!(flags & MSG_PEEK))
rxrpc_release_call(rx, call);
msg->msg_flags |= MSG_EOR;
ret = 1;
}
if (rxrpc_call_is_complete(call) &&
skb_queue_empty(&call->recvmsg_queue))
goto call_complete;
if (rxrpc_call_has_failed(call))
goto call_failed;
rxrpc_notify_socket(call);
goto not_yet_complete;
call_failed:
rxrpc_purge_queue(&call->recvmsg_queue);
call_complete:
ret = rxrpc_recvmsg_term(call, msg);
if (ret < 0)
goto error_unlock_call;
if (!(flags & MSG_PEEK))
rxrpc_release_call(rx, call);
msg->msg_flags |= MSG_EOR;
ret = 1;
not_yet_complete:
if (ret == 0)
msg->msg_flags |= MSG_MORE;
else
......@@ -586,49 +479,34 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
size_t offset = 0;
int ret;
_enter("{%d,%s},%zu,%d",
call->debug_id, rxrpc_call_states[call->state],
*_len, want_more);
ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_SECURING);
_enter("{%d},%zu,%d", call->debug_id, *_len, want_more);
mutex_lock(&call->user_mutex);
switch (READ_ONCE(call->state)) {
case RXRPC_CALL_CLIENT_RECV_REPLY:
case RXRPC_CALL_SERVER_RECV_REQUEST:
case RXRPC_CALL_SERVER_ACK_REQUEST:
ret = rxrpc_recvmsg_data(sock, call, NULL, iter,
*_len, 0, &offset);
*_len -= offset;
if (ret < 0)
goto out;
/* We can only reach here with a partially full buffer if we
* have reached the end of the data. We must otherwise have a
* full buffer or have been given -EAGAIN.
*/
if (ret == 1) {
if (iov_iter_count(iter) > 0)
goto short_data;
if (!want_more)
goto read_phase_complete;
ret = 0;
goto out;
}
if (!want_more)
goto excess_data;
ret = rxrpc_recvmsg_data(sock, call, NULL, iter, *_len, 0, &offset);
*_len -= offset;
if (ret == -EIO)
goto call_failed;
if (ret < 0)
goto out;
case RXRPC_CALL_COMPLETE:
goto call_complete;
default:
ret = -EINPROGRESS;
/* We can only reach here with a partially full buffer if we have
* reached the end of the data. We must otherwise have a full buffer
* or have been given -EAGAIN.
*/
if (ret == 1) {
if (iov_iter_count(iter) > 0)
goto short_data;
if (!want_more)
goto read_phase_complete;
ret = 0;
goto out;
}
if (!want_more)
goto excess_data;
goto out;
read_phase_complete:
ret = 1;
out:
......@@ -639,14 +517,18 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
return ret;
short_data:
trace_rxrpc_rx_eproto(call, 0, tracepoint_string("short_data"));
trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data,
call->cid, call->call_id, call->rx_consumed,
0, -EBADMSG);
ret = -EBADMSG;
goto out;
excess_data:
trace_rxrpc_rx_eproto(call, 0, tracepoint_string("excess_data"));
trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data,
call->cid, call->call_id, call->rx_consumed,
0, -EMSGSIZE);
ret = -EMSGSIZE;
goto out;
call_complete:
call_failed:
*_abort = call->abort_code;
ret = call->error;
if (call->completion == RXRPC_CALL_SUCCEEDED) {
......
This diff is collapsed.
......@@ -10,6 +10,8 @@
#include <linux/slab.h>
#include <net/sock.h>
#include <net/af_rxrpc.h>
#define RXRPC_TRACE_ONLY_DEFINE_ENUMS
#include <trace/events/rxrpc.h>
MODULE_DESCRIPTION("rxperf test server (afs)");
MODULE_AUTHOR("Red Hat, Inc.");
......@@ -307,12 +309,14 @@ static void rxperf_deliver_to_call(struct work_struct *work)
case -EOPNOTSUPP:
abort_code = RXGEN_OPCODE;
rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
abort_code, ret, "GOP");
abort_code, ret,
rxperf_abort_op_not_supported);
goto call_complete;
case -ENOTSUPP:
abort_code = RX_USER_ABORT;
rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
abort_code, ret, "GUA");
abort_code, ret,
rxperf_abort_op_not_supported);
goto call_complete;
case -EIO:
pr_err("Call %u in bad state %u\n",
......@@ -324,11 +328,13 @@ static void rxperf_deliver_to_call(struct work_struct *work)
case -ENOMEM:
case -EFAULT:
rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
RXGEN_SS_UNMARSHAL, ret, "GUM");
RXGEN_SS_UNMARSHAL, ret,
rxperf_abort_unmarshal_error);
goto call_complete;
default:
rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
RX_CALL_DEAD, ret, "GER");
RX_CALL_DEAD, ret,
rxperf_abort_general_error);
goto call_complete;
}
}
......@@ -523,7 +529,8 @@ static int rxperf_process_call(struct rxperf_call *call)
if (n == -ENOMEM)
rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
RXGEN_SS_MARSHAL, -ENOMEM, "GOM");
RXGEN_SS_MARSHAL, -ENOMEM,
rxperf_abort_oom);
return n;
}
......
......@@ -97,38 +97,31 @@ int rxrpc_init_client_call_security(struct rxrpc_call *call)
*/
int rxrpc_init_client_conn_security(struct rxrpc_connection *conn)
{
const struct rxrpc_security *sec;
struct rxrpc_key_token *token;
struct key *key = conn->key;
int ret;
int ret = 0;
_enter("{%d},{%x}", conn->debug_id, key_serial(key));
if (!key)
return 0;
ret = key_validate(key);
if (ret < 0)
return ret;
for (token = key->payload.data[0]; token; token = token->next) {
sec = rxrpc_security_lookup(token->security_index);
if (sec)
if (token->security_index == conn->security->security_index)
goto found;
}
return -EKEYREJECTED;
found:
conn->security = sec;
ret = conn->security->init_connection_security(conn, token);
if (ret < 0) {
conn->security = &rxrpc_no_security;
return ret;
mutex_lock(&conn->security_lock);
if (conn->state == RXRPC_CONN_CLIENT_UNSECURED) {
ret = conn->security->init_connection_security(conn, token);
if (ret == 0) {
spin_lock(&conn->state_lock);
if (conn->state == RXRPC_CONN_CLIENT_UNSECURED)
conn->state = RXRPC_CONN_CLIENT;
spin_unlock(&conn->state_lock);
}
}
_leave(" = 0");
return 0;
mutex_unlock(&conn->security_lock);
return ret;
}
/*
......@@ -144,21 +137,15 @@ const struct rxrpc_security *rxrpc_get_incoming_security(struct rxrpc_sock *rx,
sec = rxrpc_security_lookup(sp->hdr.securityIndex);
if (!sec) {
trace_rxrpc_abort(0, "SVS",
sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
RX_INVALID_OPERATION, EKEYREJECTED);
skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
skb->priority = RX_INVALID_OPERATION;
rxrpc_direct_abort(skb, rxrpc_abort_unsupported_security,
RX_INVALID_OPERATION, -EKEYREJECTED);
return NULL;
}
if (sp->hdr.securityIndex != RXRPC_SECURITY_NONE &&
!rx->securities) {
trace_rxrpc_abort(0, "SVR",
sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
RX_INVALID_OPERATION, EKEYREJECTED);
skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
skb->priority = sec->no_key_abort;
rxrpc_direct_abort(skb, rxrpc_abort_no_service_key,
sec->no_key_abort, -EKEYREJECTED);
return NULL;
}
......@@ -191,9 +178,9 @@ struct key *rxrpc_look_up_server_security(struct rxrpc_connection *conn,
sprintf(kdesc, "%u:%u",
sp->hdr.serviceId, sp->hdr.securityIndex);
rcu_read_lock();
read_lock(&conn->local->services_lock);
rx = rcu_dereference(conn->local->service);
rx = conn->local->service;
if (!rx)
goto out;
......@@ -215,6 +202,6 @@ struct key *rxrpc_look_up_server_security(struct rxrpc_connection *conn,
}
out:
rcu_read_unlock();
read_unlock(&conn->local->services_lock);
return key;
}
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment