Commit 38f080f3 authored by Chuck Lever's avatar Chuck Lever

NFSD: Move callback_wq into struct nfs4_client

Commit 88382036 ("nfsd: update workqueue creation") made the
callback_wq single-threaded, presumably to protect modifications of
cl_cb_client. See documenting comment for nfsd4_process_cb_update().

However, cl_cb_client is per-lease. There's no other reason that all
callback operations need to be dispatched via a single thread. The
single threading here means all client callbacks can be blocked by a
problem with one client.

Change the NFSv4 callback client so it serializes per-lease instead
of serializing all NFSv4 callback operations on the server.
Reported-by: default avatarDai Ngo <dai.ngo@oracle.com>
Reviewed-by: default avatarJeff Layton <jlayton@kernel.org>
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent 56c35f43
...@@ -978,12 +978,12 @@ static int max_cb_time(struct net *net) ...@@ -978,12 +978,12 @@ static int max_cb_time(struct net *net)
return max(((u32)nn->nfsd4_lease)/10, 1u) * HZ; return max(((u32)nn->nfsd4_lease)/10, 1u) * HZ;
} }
static struct workqueue_struct *callback_wq;
static bool nfsd4_queue_cb(struct nfsd4_callback *cb) static bool nfsd4_queue_cb(struct nfsd4_callback *cb)
{ {
trace_nfsd_cb_queue(cb->cb_clp, cb); struct nfs4_client *clp = cb->cb_clp;
return queue_work(callback_wq, &cb->cb_work);
trace_nfsd_cb_queue(clp, cb);
return queue_work(clp->cl_callback_wq, &cb->cb_work);
} }
static void nfsd41_cb_inflight_begin(struct nfs4_client *clp) static void nfsd41_cb_inflight_begin(struct nfs4_client *clp)
...@@ -1153,7 +1153,7 @@ void nfsd4_probe_callback(struct nfs4_client *clp) ...@@ -1153,7 +1153,7 @@ void nfsd4_probe_callback(struct nfs4_client *clp)
void nfsd4_probe_callback_sync(struct nfs4_client *clp) void nfsd4_probe_callback_sync(struct nfs4_client *clp)
{ {
nfsd4_probe_callback(clp); nfsd4_probe_callback(clp);
flush_workqueue(callback_wq); flush_workqueue(clp->cl_callback_wq);
} }
void nfsd4_change_callback(struct nfs4_client *clp, struct nfs4_cb_conn *conn) void nfsd4_change_callback(struct nfs4_client *clp, struct nfs4_cb_conn *conn)
...@@ -1372,19 +1372,6 @@ static const struct rpc_call_ops nfsd4_cb_ops = { ...@@ -1372,19 +1372,6 @@ static const struct rpc_call_ops nfsd4_cb_ops = {
.rpc_release = nfsd4_cb_release, .rpc_release = nfsd4_cb_release,
}; };
int nfsd4_create_callback_queue(void)
{
callback_wq = alloc_ordered_workqueue("nfsd4_callbacks", 0);
if (!callback_wq)
return -ENOMEM;
return 0;
}
void nfsd4_destroy_callback_queue(void)
{
destroy_workqueue(callback_wq);
}
/* must be called under the state lock */ /* must be called under the state lock */
void nfsd4_shutdown_callback(struct nfs4_client *clp) void nfsd4_shutdown_callback(struct nfs4_client *clp)
{ {
...@@ -1398,7 +1385,7 @@ void nfsd4_shutdown_callback(struct nfs4_client *clp) ...@@ -1398,7 +1385,7 @@ void nfsd4_shutdown_callback(struct nfs4_client *clp)
* client, destroy the rpc client, and stop: * client, destroy the rpc client, and stop:
*/ */
nfsd4_run_cb(&clp->cl_cb_null); nfsd4_run_cb(&clp->cl_cb_null);
flush_workqueue(callback_wq); flush_workqueue(clp->cl_callback_wq);
nfsd41_cb_inflight_wait_complete(clp); nfsd41_cb_inflight_wait_complete(clp);
} }
...@@ -1420,9 +1407,9 @@ static struct nfsd4_conn * __nfsd4_find_backchannel(struct nfs4_client *clp) ...@@ -1420,9 +1407,9 @@ static struct nfsd4_conn * __nfsd4_find_backchannel(struct nfs4_client *clp)
/* /*
* Note there isn't a lot of locking in this code; instead we depend on * Note there isn't a lot of locking in this code; instead we depend on
* the fact that it is run from the callback_wq, which won't run two * the fact that it is run from clp->cl_callback_wq, which won't run two
* work items at once. So, for example, callback_wq handles all access * work items at once. So, for example, clp->cl_callback_wq handles all
* of cl_cb_client and all calls to rpc_create or rpc_shutdown_client. * access of cl_cb_client and all calls to rpc_create or rpc_shutdown_client.
*/ */
static void nfsd4_process_cb_update(struct nfsd4_callback *cb) static void nfsd4_process_cb_update(struct nfsd4_callback *cb)
{ {
......
...@@ -2233,6 +2233,10 @@ static struct nfs4_client *alloc_client(struct xdr_netobj name, ...@@ -2233,6 +2233,10 @@ static struct nfs4_client *alloc_client(struct xdr_netobj name,
GFP_KERNEL); GFP_KERNEL);
if (!clp->cl_ownerstr_hashtbl) if (!clp->cl_ownerstr_hashtbl)
goto err_no_hashtbl; goto err_no_hashtbl;
clp->cl_callback_wq = alloc_ordered_workqueue("nfsd4_callbacks", 0);
if (!clp->cl_callback_wq)
goto err_no_callback_wq;
for (i = 0; i < OWNER_HASH_SIZE; i++) for (i = 0; i < OWNER_HASH_SIZE; i++)
INIT_LIST_HEAD(&clp->cl_ownerstr_hashtbl[i]); INIT_LIST_HEAD(&clp->cl_ownerstr_hashtbl[i]);
INIT_LIST_HEAD(&clp->cl_sessions); INIT_LIST_HEAD(&clp->cl_sessions);
...@@ -2255,6 +2259,8 @@ static struct nfs4_client *alloc_client(struct xdr_netobj name, ...@@ -2255,6 +2259,8 @@ static struct nfs4_client *alloc_client(struct xdr_netobj name,
spin_lock_init(&clp->cl_lock); spin_lock_init(&clp->cl_lock);
rpc_init_wait_queue(&clp->cl_cb_waitq, "Backchannel slot table"); rpc_init_wait_queue(&clp->cl_cb_waitq, "Backchannel slot table");
return clp; return clp;
err_no_callback_wq:
kfree(clp->cl_ownerstr_hashtbl);
err_no_hashtbl: err_no_hashtbl:
kfree(clp->cl_name.data); kfree(clp->cl_name.data);
err_no_name: err_no_name:
...@@ -2268,6 +2274,7 @@ static void __free_client(struct kref *k) ...@@ -2268,6 +2274,7 @@ static void __free_client(struct kref *k)
struct nfs4_client *clp = container_of(c, struct nfs4_client, cl_nfsdfs); struct nfs4_client *clp = container_of(c, struct nfs4_client, cl_nfsdfs);
free_svc_cred(&clp->cl_cred); free_svc_cred(&clp->cl_cred);
destroy_workqueue(clp->cl_callback_wq);
kfree(clp->cl_ownerstr_hashtbl); kfree(clp->cl_ownerstr_hashtbl);
kfree(clp->cl_name.data); kfree(clp->cl_name.data);
kfree(clp->cl_nii_domain.data); kfree(clp->cl_nii_domain.data);
...@@ -8636,12 +8643,6 @@ nfs4_state_start(void) ...@@ -8636,12 +8643,6 @@ nfs4_state_start(void)
if (ret) if (ret)
return ret; return ret;
ret = nfsd4_create_callback_queue();
if (ret) {
rhltable_destroy(&nfs4_file_rhltable);
return ret;
}
set_max_delegations(); set_max_delegations();
return 0; return 0;
} }
...@@ -8682,7 +8683,6 @@ nfs4_state_shutdown_net(struct net *net) ...@@ -8682,7 +8683,6 @@ nfs4_state_shutdown_net(struct net *net)
void void
nfs4_state_shutdown(void) nfs4_state_shutdown(void)
{ {
nfsd4_destroy_callback_queue();
rhltable_destroy(&nfs4_file_rhltable); rhltable_destroy(&nfs4_file_rhltable);
} }
......
...@@ -408,6 +408,8 @@ struct nfs4_client { ...@@ -408,6 +408,8 @@ struct nfs4_client {
1 << NFSD4_CLIENT_CB_KILL) 1 << NFSD4_CLIENT_CB_KILL)
#define NFSD4_CLIENT_CB_RECALL_ANY (6) #define NFSD4_CLIENT_CB_RECALL_ANY (6)
unsigned long cl_flags; unsigned long cl_flags;
struct workqueue_struct *cl_callback_wq;
const struct cred *cl_cb_cred; const struct cred *cl_cb_cred;
struct rpc_clnt *cl_cb_client; struct rpc_clnt *cl_cb_client;
u32 cl_cb_ident; u32 cl_cb_ident;
...@@ -735,8 +737,6 @@ extern void nfsd4_change_callback(struct nfs4_client *clp, struct nfs4_cb_conn * ...@@ -735,8 +737,6 @@ extern void nfsd4_change_callback(struct nfs4_client *clp, struct nfs4_cb_conn *
extern void nfsd4_init_cb(struct nfsd4_callback *cb, struct nfs4_client *clp, extern void nfsd4_init_cb(struct nfsd4_callback *cb, struct nfs4_client *clp,
const struct nfsd4_callback_ops *ops, enum nfsd4_cb_op op); const struct nfsd4_callback_ops *ops, enum nfsd4_cb_op op);
extern bool nfsd4_run_cb(struct nfsd4_callback *cb); extern bool nfsd4_run_cb(struct nfsd4_callback *cb);
extern int nfsd4_create_callback_queue(void);
extern void nfsd4_destroy_callback_queue(void);
extern void nfsd4_shutdown_callback(struct nfs4_client *); extern void nfsd4_shutdown_callback(struct nfs4_client *);
extern void nfsd4_shutdown_copy(struct nfs4_client *clp); extern void nfsd4_shutdown_copy(struct nfs4_client *clp);
extern struct nfs4_client_reclaim *nfs4_client_to_reclaim(struct xdr_netobj name, extern struct nfs4_client_reclaim *nfs4_client_to_reclaim(struct xdr_netobj name,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment