Commit 341f741f authored by David Howells's avatar David Howells

afs: Refcount the afs_call struct

A static checker warning occurs in the AFS filesystem:

	fs/afs/cmservice.c:155 SRXAFSCB_CallBack()
	error: dereferencing freed memory 'call'

due to the reply being sent before we access the server it points to.  The
act of sending the reply causes the call to be freed if an error occurs
(but not if it doesn't).

On top of this, the lifetime handling of afs_call structs is fragile
because they get passed around through workqueues without any sort of
refcounting.

Deal with the issues by:

 (1) Fix the maybe/maybe not nature of the reply sending functions with
     regards to whether they release the call struct.

 (2) Refcount the afs_call struct and sort out places that need to get/put
     references.

 (3) Pass a ref through the work queue and release (or pass on) that ref in
     the work function.  Care has to be taken because a work queue may
     already own a ref to the call.

 (4) Do the cleaning up in the put function only.

 (5) Simplify module cleanup by always incrementing afs_outstanding_calls
     whenever a call is allocated.

 (6) Set the backlog to 0 with kernel_listen() at the beginning of the
     process of closing the socket to prevent new incoming calls from
     occurring and to remove the contribution of preallocated calls from
     afs_outstanding_calls before we wait on it.

A tracepoint is also added to monitor the afs_call refcount and lifetime.
Reported-by: default avatarDan Carpenter <dan.carpenter@oracle.com>
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
Fixes: 08e0e7c8: "[AF_RXRPC]: Make the in-kernel AFS filesystem use AF_RXRPC."
parent 210f0353
......@@ -24,6 +24,11 @@ static int afs_deliver_cb_callback(struct afs_call *);
static int afs_deliver_cb_probe_uuid(struct afs_call *);
static int afs_deliver_cb_tell_me_about_yourself(struct afs_call *);
static void afs_cm_destructor(struct afs_call *);
static void SRXAFSCB_CallBack(struct work_struct *);
static void SRXAFSCB_InitCallBackState(struct work_struct *);
static void SRXAFSCB_Probe(struct work_struct *);
static void SRXAFSCB_ProbeUuid(struct work_struct *);
static void SRXAFSCB_TellMeAboutYourself(struct work_struct *);
#define CM_NAME(name) \
const char afs_SRXCB##name##_name[] __tracepoint_string = \
......@@ -38,6 +43,7 @@ static const struct afs_call_type afs_SRXCBCallBack = {
.deliver = afs_deliver_cb_callback,
.abort_to_error = afs_abort_to_error,
.destructor = afs_cm_destructor,
.work = SRXAFSCB_CallBack,
};
/*
......@@ -49,6 +55,7 @@ static const struct afs_call_type afs_SRXCBInitCallBackState = {
.deliver = afs_deliver_cb_init_call_back_state,
.abort_to_error = afs_abort_to_error,
.destructor = afs_cm_destructor,
.work = SRXAFSCB_InitCallBackState,
};
/*
......@@ -60,6 +67,7 @@ static const struct afs_call_type afs_SRXCBInitCallBackState3 = {
.deliver = afs_deliver_cb_init_call_back_state3,
.abort_to_error = afs_abort_to_error,
.destructor = afs_cm_destructor,
.work = SRXAFSCB_InitCallBackState,
};
/*
......@@ -71,6 +79,7 @@ static const struct afs_call_type afs_SRXCBProbe = {
.deliver = afs_deliver_cb_probe,
.abort_to_error = afs_abort_to_error,
.destructor = afs_cm_destructor,
.work = SRXAFSCB_Probe,
};
/*
......@@ -82,6 +91,7 @@ static const struct afs_call_type afs_SRXCBProbeUuid = {
.deliver = afs_deliver_cb_probe_uuid,
.abort_to_error = afs_abort_to_error,
.destructor = afs_cm_destructor,
.work = SRXAFSCB_ProbeUuid,
};
/*
......@@ -93,6 +103,7 @@ static const struct afs_call_type afs_SRXCBTellMeAboutYourself = {
.deliver = afs_deliver_cb_tell_me_about_yourself,
.abort_to_error = afs_abort_to_error,
.destructor = afs_cm_destructor,
.work = SRXAFSCB_TellMeAboutYourself,
};
/*
......@@ -163,6 +174,7 @@ static void SRXAFSCB_CallBack(struct work_struct *work)
afs_send_empty_reply(call);
afs_break_callbacks(call->server, call->count, call->request);
afs_put_call(call);
_leave("");
}
......@@ -284,9 +296,7 @@ static int afs_deliver_cb_callback(struct afs_call *call)
return -ENOTCONN;
call->server = server;
INIT_WORK(&call->work, SRXAFSCB_CallBack);
queue_work(afs_wq, &call->work);
return 0;
return afs_queue_call_work(call);
}
/*
......@@ -300,6 +310,7 @@ static void SRXAFSCB_InitCallBackState(struct work_struct *work)
afs_init_callback_state(call->server);
afs_send_empty_reply(call);
afs_put_call(call);
_leave("");
}
......@@ -330,9 +341,7 @@ static int afs_deliver_cb_init_call_back_state(struct afs_call *call)
return -ENOTCONN;
call->server = server;
INIT_WORK(&call->work, SRXAFSCB_InitCallBackState);
queue_work(afs_wq, &call->work);
return 0;
return afs_queue_call_work(call);
}
/*
......@@ -404,9 +413,7 @@ static int afs_deliver_cb_init_call_back_state3(struct afs_call *call)
return -ENOTCONN;
call->server = server;
INIT_WORK(&call->work, SRXAFSCB_InitCallBackState);
queue_work(afs_wq, &call->work);
return 0;
return afs_queue_call_work(call);
}
/*
......@@ -418,6 +425,7 @@ static void SRXAFSCB_Probe(struct work_struct *work)
_enter("");
afs_send_empty_reply(call);
afs_put_call(call);
_leave("");
}
......@@ -437,9 +445,7 @@ static int afs_deliver_cb_probe(struct afs_call *call)
/* no unmarshalling required */
call->state = AFS_CALL_REPLYING;
INIT_WORK(&call->work, SRXAFSCB_Probe);
queue_work(afs_wq, &call->work);
return 0;
return afs_queue_call_work(call);
}
/*
......@@ -462,6 +468,7 @@ static void SRXAFSCB_ProbeUuid(struct work_struct *work)
reply.match = htonl(1);
afs_send_simple_reply(call, &reply, sizeof(reply));
afs_put_call(call);
_leave("");
}
......@@ -520,9 +527,7 @@ static int afs_deliver_cb_probe_uuid(struct afs_call *call)
call->state = AFS_CALL_REPLYING;
INIT_WORK(&call->work, SRXAFSCB_ProbeUuid);
queue_work(afs_wq, &call->work);
return 0;
return afs_queue_call_work(call);
}
/*
......@@ -584,7 +589,7 @@ static void SRXAFSCB_TellMeAboutYourself(struct work_struct *work)
reply.cap.capcount = htonl(1);
reply.cap.caps[0] = htonl(AFS_CAP_ERROR_TRANSLATION);
afs_send_simple_reply(call, &reply, sizeof(reply));
afs_put_call(call);
_leave("");
}
......@@ -604,7 +609,5 @@ static int afs_deliver_cb_tell_me_about_yourself(struct afs_call *call)
/* no unmarshalling required */
call->state = AFS_CALL_REPLYING;
INIT_WORK(&call->work, SRXAFSCB_TellMeAboutYourself);
queue_work(afs_wq, &call->work);
return 0;
return afs_queue_call_work(call);
}
......@@ -66,7 +66,7 @@ enum afs_call_state {
struct afs_call {
const struct afs_call_type *type; /* type of call */
wait_queue_head_t waitq; /* processes awaiting completion */
struct work_struct async_work; /* asynchronous work processor */
struct work_struct async_work; /* async I/O processor */
struct work_struct work; /* actual work processor */
struct rxrpc_call *rxcall; /* RxRPC call handle */
struct key *key; /* security for this call */
......@@ -82,6 +82,7 @@ struct afs_call {
pgoff_t first; /* first page in mapping to deal with */
pgoff_t last; /* last page in mapping to deal with */
size_t offset; /* offset into received data store */
atomic_t usage;
enum afs_call_state state;
int error; /* error code */
u32 abort_code; /* Remote abort ID or 0 */
......@@ -115,6 +116,9 @@ struct afs_call_type {
/* clean up a call */
void (*destructor)(struct afs_call *call);
/* Work function */
void (*work)(struct work_struct *work);
};
/*
......@@ -591,9 +595,12 @@ extern void afs_proc_cell_remove(struct afs_cell *);
* rxrpc.c
*/
extern struct socket *afs_socket;
extern atomic_t afs_outstanding_calls;
extern int afs_open_socket(void);
extern void afs_close_socket(void);
extern void afs_put_call(struct afs_call *);
extern int afs_queue_call_work(struct afs_call *);
extern int afs_make_call(struct in_addr *, struct afs_call *, gfp_t, bool);
extern struct afs_call *afs_alloc_flat_call(const struct afs_call_type *,
size_t, size_t);
......
......@@ -19,9 +19,8 @@
struct socket *afs_socket; /* my RxRPC socket */
static struct workqueue_struct *afs_async_calls;
static struct afs_call *afs_spare_incoming_call;
static atomic_t afs_outstanding_calls;
atomic_t afs_outstanding_calls;
static void afs_free_call(struct afs_call *);
static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
static int afs_wait_for_call_to_complete(struct afs_call *);
static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
......@@ -112,9 +111,11 @@ void afs_close_socket(void)
{
_enter("");
kernel_listen(afs_socket, 0);
flush_workqueue(afs_async_calls);
if (afs_spare_incoming_call) {
atomic_inc(&afs_outstanding_calls);
afs_free_call(afs_spare_incoming_call);
afs_put_call(afs_spare_incoming_call);
afs_spare_incoming_call = NULL;
}
......@@ -123,7 +124,6 @@ void afs_close_socket(void)
TASK_UNINTERRUPTIBLE);
_debug("no outstanding calls");
flush_workqueue(afs_async_calls);
kernel_sock_shutdown(afs_socket, SHUT_RDWR);
flush_workqueue(afs_async_calls);
sock_release(afs_socket);
......@@ -134,44 +134,79 @@ void afs_close_socket(void)
}
/*
* free a call
* Allocate a call.
*/
static void afs_free_call(struct afs_call *call)
static struct afs_call *afs_alloc_call(const struct afs_call_type *type,
gfp_t gfp)
{
_debug("DONE %p{%s} [%d]",
call, call->type->name, atomic_read(&afs_outstanding_calls));
struct afs_call *call;
int o;
ASSERTCMP(call->rxcall, ==, NULL);
ASSERT(!work_pending(&call->async_work));
ASSERT(call->type->name != NULL);
call = kzalloc(sizeof(*call), gfp);
if (!call)
return NULL;
kfree(call->request);
kfree(call);
call->type = type;
atomic_set(&call->usage, 1);
INIT_WORK(&call->async_work, afs_process_async_call);
init_waitqueue_head(&call->waitq);
if (atomic_dec_and_test(&afs_outstanding_calls))
wake_up_atomic_t(&afs_outstanding_calls);
o = atomic_inc_return(&afs_outstanding_calls);
trace_afs_call(call, afs_call_trace_alloc, 1, o,
__builtin_return_address(0));
return call;
}
/*
* End a call but do not free it
* Dispose of a reference on a call.
*/
static void afs_end_call_nofree(struct afs_call *call)
void afs_put_call(struct afs_call *call)
{
if (call->rxcall) {
rxrpc_kernel_end_call(afs_socket, call->rxcall);
call->rxcall = NULL;
int n = atomic_dec_return(&call->usage);
int o = atomic_read(&afs_outstanding_calls);
trace_afs_call(call, afs_call_trace_put, n + 1, o,
__builtin_return_address(0));
ASSERTCMP(n, >=, 0);
if (n == 0) {
ASSERT(!work_pending(&call->async_work));
ASSERT(call->type->name != NULL);
if (call->rxcall) {
rxrpc_kernel_end_call(afs_socket, call->rxcall);
call->rxcall = NULL;
}
if (call->type->destructor)
call->type->destructor(call);
kfree(call->request);
kfree(call);
o = atomic_dec_return(&afs_outstanding_calls);
trace_afs_call(call, afs_call_trace_free, 0, o,
__builtin_return_address(0));
if (o == 0)
wake_up_atomic_t(&afs_outstanding_calls);
}
if (call->type->destructor)
call->type->destructor(call);
}
/*
* End a call and free it
* Queue the call for actual work. Returns 0 unconditionally for convenience.
*/
static void afs_end_call(struct afs_call *call)
int afs_queue_call_work(struct afs_call *call)
{
afs_end_call_nofree(call);
afs_free_call(call);
int u = atomic_inc_return(&call->usage);
trace_afs_call(call, afs_call_trace_work, u,
atomic_read(&afs_outstanding_calls),
__builtin_return_address(0));
INIT_WORK(&call->work, call->type->work);
if (!queue_work(afs_wq, &call->work))
afs_put_call(call);
return 0;
}
/*
......@@ -182,25 +217,19 @@ struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type,
{
struct afs_call *call;
call = kzalloc(sizeof(*call), GFP_NOFS);
call = afs_alloc_call(type, GFP_NOFS);
if (!call)
goto nomem_call;
_debug("CALL %p{%s} [%d]",
call, type->name, atomic_read(&afs_outstanding_calls));
atomic_inc(&afs_outstanding_calls);
call->type = type;
call->request_size = request_size;
call->reply_max = reply_max;
if (request_size) {
call->request_size = request_size;
call->request = kmalloc(request_size, GFP_NOFS);
if (!call->request)
goto nomem_free;
}
if (reply_max) {
call->reply_max = reply_max;
call->buffer = kmalloc(reply_max, GFP_NOFS);
if (!call->buffer)
goto nomem_free;
......@@ -210,7 +239,7 @@ struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type,
return call;
nomem_free:
afs_free_call(call);
afs_put_call(call);
nomem_call:
return NULL;
}
......@@ -315,7 +344,6 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
atomic_read(&afs_outstanding_calls));
call->async = async;
INIT_WORK(&call->async_work, afs_process_async_call);
memset(&srx, 0, sizeof(srx));
srx.srx_family = AF_RXRPC;
......@@ -378,7 +406,7 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
error_do_abort:
rxrpc_kernel_abort_call(afs_socket, rxcall, RX_USER_ABORT, -ret, "KSD");
error_kill_call:
afs_end_call(call);
afs_put_call(call);
_leave(" = %d", ret);
return ret;
}
......@@ -448,7 +476,7 @@ static void afs_deliver_to_call(struct afs_call *call)
done:
if (call->state == AFS_CALL_COMPLETE && call->incoming)
afs_end_call(call);
afs_put_call(call);
out:
_leave("");
return;
......@@ -505,7 +533,7 @@ static int afs_wait_for_call_to_complete(struct afs_call *call)
}
_debug("call complete");
afs_end_call(call);
afs_put_call(call);
_leave(" = %d", ret);
return ret;
}
......@@ -529,14 +557,25 @@ static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
unsigned long call_user_ID)
{
struct afs_call *call = (struct afs_call *)call_user_ID;
int u;
trace_afs_notify_call(rxcall, call);
call->need_attention = true;
queue_work(afs_async_calls, &call->async_work);
u = __atomic_add_unless(&call->usage, 1, 0);
if (u != 0) {
trace_afs_call(call, afs_call_trace_wake, u,
atomic_read(&afs_outstanding_calls),
__builtin_return_address(0));
if (!queue_work(afs_async_calls, &call->async_work))
afs_put_call(call);
}
}
/*
* delete an asynchronous call
* Delete an asynchronous call. The work item carries a ref to the call struct
* that we need to release.
*/
static void afs_delete_async_call(struct work_struct *work)
{
......@@ -544,13 +583,14 @@ static void afs_delete_async_call(struct work_struct *work)
_enter("");
afs_free_call(call);
afs_put_call(call);
_leave("");
}
/*
* perform processing on an asynchronous call
* Perform I/O processing on an asynchronous call. The work item carries a ref
* to the call struct that we either need to release or to pass on.
*/
static void afs_process_async_call(struct work_struct *work)
{
......@@ -566,15 +606,16 @@ static void afs_process_async_call(struct work_struct *work)
if (call->state == AFS_CALL_COMPLETE) {
call->reply = NULL;
/* kill the call */
afs_end_call_nofree(call);
/* we can't just delete the call because the work item may be
* queued */
/* We have two refs to release - one from the alloc and one
* queued with the work item - and we can't just deallocate the
* call because the work item may be queued again.
*/
call->async_work.func = afs_delete_async_call;
queue_work(afs_async_calls, &call->async_work);
if (!queue_work(afs_async_calls, &call->async_work))
afs_put_call(call);
}
afs_put_call(call);
_leave("");
}
......@@ -594,12 +635,10 @@ static void afs_charge_preallocation(struct work_struct *work)
for (;;) {
if (!call) {
call = kzalloc(sizeof(struct afs_call), GFP_KERNEL);
call = afs_alloc_call(&afs_RXCMxxxx, GFP_KERNEL);
if (!call)
break;
INIT_WORK(&call->async_work, afs_process_async_call);
call->type = &afs_RXCMxxxx;
call->async = true;
call->state = AFS_CALL_AWAIT_OP_ID;
init_waitqueue_head(&call->waitq);
......@@ -624,9 +663,8 @@ static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
{
struct afs_call *call = (struct afs_call *)user_call_ID;
atomic_inc(&afs_outstanding_calls);
call->rxcall = NULL;
afs_free_call(call);
afs_put_call(call);
}
/*
......@@ -635,7 +673,6 @@ static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
unsigned long user_call_ID)
{
atomic_inc(&afs_outstanding_calls);
queue_work(afs_wq, &afs_charge_preallocation_work);
}
......@@ -699,7 +736,6 @@ void afs_send_empty_reply(struct afs_call *call)
rxrpc_kernel_abort_call(afs_socket, call->rxcall,
RX_USER_ABORT, ENOMEM, "KOO");
default:
afs_end_call(call);
_leave(" [error]");
return;
}
......@@ -738,7 +774,6 @@ void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
rxrpc_kernel_abort_call(afs_socket, call->rxcall,
RX_USER_ABORT, ENOMEM, "KOO");
}
afs_end_call(call);
_leave(" [error]");
}
......
......@@ -16,6 +16,51 @@
#include <linux/tracepoint.h>
/*
* Define enums for tracing information.
*/
#ifndef __AFS_DECLARE_TRACE_ENUMS_ONCE_ONLY
#define __AFS_DECLARE_TRACE_ENUMS_ONCE_ONLY
enum afs_call_trace {
afs_call_trace_alloc,
afs_call_trace_free,
afs_call_trace_put,
afs_call_trace_wake,
afs_call_trace_work,
};
#endif /* end __AFS_DECLARE_TRACE_ENUMS_ONCE_ONLY */
/*
* Declare tracing information enums and their string mappings for display.
*/
#define afs_call_traces \
EM(afs_call_trace_alloc, "ALLOC") \
EM(afs_call_trace_free, "FREE ") \
EM(afs_call_trace_put, "PUT ") \
EM(afs_call_trace_wake, "WAKE ") \
E_(afs_call_trace_work, "WORK ")
/*
* Export enum symbols via userspace.
*/
#undef EM
#undef E_
#define EM(a, b) TRACE_DEFINE_ENUM(a);
#define E_(a, b) TRACE_DEFINE_ENUM(a);
afs_call_traces;
/*
* Now redefine the EM() and E_() macros to map the enums to the strings that
* will be printed in the output.
*/
#undef EM
#undef E_
#define EM(a, b) { a, b },
#define E_(a, b) { a, b }
TRACE_EVENT(afs_recv_data,
TP_PROTO(struct afs_call *call, unsigned count, unsigned offset,
bool want_more, int ret),
......@@ -103,6 +148,36 @@ TRACE_EVENT(afs_cb_call,
__entry->op)
);
TRACE_EVENT(afs_call,
TP_PROTO(struct afs_call *call, enum afs_call_trace op,
int usage, int outstanding, const void *where),
TP_ARGS(call, op, usage, outstanding, where),
TP_STRUCT__entry(
__field(struct afs_call *, call )
__field(int, op )
__field(int, usage )
__field(int, outstanding )
__field(const void *, where )
),
TP_fast_assign(
__entry->call = call;
__entry->op = op;
__entry->usage = usage;
__entry->outstanding = outstanding;
__entry->where = where;
),
TP_printk("c=%p %s u=%d o=%d sp=%pSR",
__entry->call,
__print_symbolic(__entry->op, afs_call_traces),
__entry->usage,
__entry->outstanding,
__entry->where)
);
#endif /* _TRACE_AFS_H */
/* This part must be outside protection */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment