Commit 77d79030 authored by Trond Myklebust's avatar Trond Myklebust

[PATCH] RPC over UDP congestion control updates [1/8]

Implement the basic round trip timing algorithm in order to adapt the
timeout values for the most common NFS operations to the server's
rate of response.
Algorithm is described in Van Jacobson's paper 1998 paper
on http://www-nrg.ee.lbl.gov/nrg-papers.html, and is the same as is
used for most TCP stacks.

Following the *BSD code, we implement separate rtt timers for GETATTR,
LOOKUP, READ/READDIR/READLINK, and WRITE. In addition to this, there
is one extra timer for the COMMIT operation.
All the remaining RPC calls use the current system in which a fixed
timeout value gets set by the 'timeo' mount option.

In case of a timeout, the current exponential backoff algoritm is
implemented. Subsequent patches will improve this...
parent ad4d2648
...@@ -561,11 +561,10 @@ nlmclt_decode_res(struct rpc_rqst *req, u32 *p, struct nlm_res *resp) ...@@ -561,11 +561,10 @@ nlmclt_decode_res(struct rpc_rqst *req, u32 *p, struct nlm_res *resp)
#define nlmclt_decode_norep NULL #define nlmclt_decode_norep NULL
#define PROC(proc, argtype, restype) \ #define PROC(proc, argtype, restype) \
{ "nlm_" #proc, \ { .p_procname = "nlm_" #proc, \
(kxdrproc_t) nlmclt_encode_##argtype, \ .p_encode = (kxdrproc_t) nlmclt_encode_##argtype, \
(kxdrproc_t) nlmclt_decode_##restype, \ .p_decode = (kxdrproc_t) nlmclt_decode_##restype, \
MAX(NLM_##argtype##_sz, NLM_##restype##_sz) << 2, \ .p_bufsiz = MAX(NLM_##argtype##_sz, NLM_##restype##_sz) << 2 \
0 \
} }
static struct rpc_procinfo nlm_procedures[] = { static struct rpc_procinfo nlm_procedures[] = {
......
...@@ -567,11 +567,10 @@ nlm4clt_decode_res(struct rpc_rqst *req, u32 *p, struct nlm_res *resp) ...@@ -567,11 +567,10 @@ nlm4clt_decode_res(struct rpc_rqst *req, u32 *p, struct nlm_res *resp)
#define nlm4clt_decode_norep NULL #define nlm4clt_decode_norep NULL
#define PROC(proc, argtype, restype) \ #define PROC(proc, argtype, restype) \
{ "nlm4_" #proc, \ { .p_procname = "nlm4_" #proc, \
(kxdrproc_t) nlm4clt_encode_##argtype, \ .p_encode = (kxdrproc_t) nlm4clt_encode_##argtype, \
(kxdrproc_t) nlm4clt_decode_##restype, \ .p_decode = (kxdrproc_t) nlm4clt_decode_##restype, \
MAX(NLM4_##argtype##_sz, NLM4_##restype##_sz) << 2, \ .p_bufsiz = MAX(NLM4_##argtype##_sz, NLM4_##restype##_sz) << 2 \
0 \
} }
static struct rpc_procinfo nlm4_procedures[] = { static struct rpc_procinfo nlm4_procedures[] = {
......
...@@ -671,33 +671,32 @@ nfs_stat_to_errno(int stat) ...@@ -671,33 +671,32 @@ nfs_stat_to_errno(int stat)
# define MAX(a, b) (((a) > (b))? (a) : (b)) # define MAX(a, b) (((a) > (b))? (a) : (b))
#endif #endif
#define PROC(proc, argtype, restype) \ #define PROC(proc, argtype, restype, timer) \
{ "nfs_" #proc, \ { .p_procname = "nfs_" #proc, \
(kxdrproc_t) nfs_xdr_##argtype, \ .p_encode = (kxdrproc_t) nfs_xdr_##argtype, \
(kxdrproc_t) nfs_xdr_##restype, \ .p_decode = (kxdrproc_t) nfs_xdr_##restype, \
MAX(NFS_##argtype##_sz,NFS_##restype##_sz) << 2, \ .p_bufsiz = MAX(NFS_##argtype##_sz,NFS_##restype##_sz) << 2, \
0 \ .p_timer = timer \
} }
static struct rpc_procinfo nfs_procedures[18] = { static struct rpc_procinfo nfs_procedures[18] = {
PROC(null, enc_void, dec_void), PROC(null, enc_void, dec_void, 0),
PROC(getattr, fhandle, attrstat), PROC(getattr, fhandle, attrstat, 1),
PROC(setattr, sattrargs, attrstat), PROC(setattr, sattrargs, attrstat, 0),
PROC(root, enc_void, dec_void), PROC(root, enc_void, dec_void, 0),
PROC(lookup, diropargs, diropres), PROC(lookup, diropargs, diropres, 2),
PROC(readlink, readlinkargs, readlinkres), PROC(readlink, readlinkargs, readlinkres, 3),
PROC(read, readargs, readres), PROC(read, readargs, readres, 3),
PROC(writecache, enc_void, dec_void), PROC(writecache, enc_void, dec_void, 0),
PROC(write, writeargs, writeres), PROC(write, writeargs, writeres, 4),
PROC(create, createargs, diropres), PROC(create, createargs, diropres, 0),
PROC(remove, diropargs, stat), PROC(remove, diropargs, stat, 0),
PROC(rename, renameargs, stat), PROC(rename, renameargs, stat, 0),
PROC(link, linkargs, stat), PROC(link, linkargs, stat, 0),
PROC(symlink, symlinkargs, stat), PROC(symlink, symlinkargs, stat, 0),
PROC(mkdir, createargs, diropres), PROC(mkdir, createargs, diropres, 0),
PROC(rmdir, diropargs, stat), PROC(rmdir, diropargs, stat, 0),
PROC(readdir, readdirargs, readdirres), PROC(readdir, readdirargs, readdirres, 3),
PROC(statfs, fhandle, statfsres), PROC(statfs, fhandle, statfsres, 0),
}; };
struct rpc_version nfs_version2 = { struct rpc_version nfs_version2 = {
......
...@@ -988,37 +988,37 @@ nfs3_xdr_commitres(struct rpc_rqst *req, u32 *p, struct nfs_writeres *res) ...@@ -988,37 +988,37 @@ nfs3_xdr_commitres(struct rpc_rqst *req, u32 *p, struct nfs_writeres *res)
# define MAX(a, b) (((a) > (b))? (a) : (b)) # define MAX(a, b) (((a) > (b))? (a) : (b))
#endif #endif
#define PROC(proc, argtype, restype) \ #define PROC(proc, argtype, restype, timer) \
{ "nfs3_" #proc, \ { .p_procname = "nfs3_" #proc, \
(kxdrproc_t) nfs3_xdr_##argtype, \ .p_encode = (kxdrproc_t) nfs3_xdr_##argtype, \
(kxdrproc_t) nfs3_xdr_##restype, \ .p_decode = (kxdrproc_t) nfs3_xdr_##restype, \
MAX(NFS3_##argtype##_sz,NFS3_##restype##_sz) << 2, \ .p_bufsiz = MAX(NFS3_##argtype##_sz,NFS3_##restype##_sz) << 2, \
0 \ .p_timer = timer \
} }
static struct rpc_procinfo nfs3_procedures[22] = { static struct rpc_procinfo nfs3_procedures[22] = {
PROC(null, enc_void, dec_void), PROC(null, enc_void, dec_void, 0),
PROC(getattr, fhandle, attrstat), PROC(getattr, fhandle, attrstat, 1),
PROC(setattr, sattrargs, wccstat), PROC(setattr, sattrargs, wccstat, 0),
PROC(lookup, diropargs, lookupres), PROC(lookup, diropargs, lookupres, 2),
PROC(access, accessargs, accessres), PROC(access, accessargs, accessres, 1),
PROC(readlink, readlinkargs, readlinkres), PROC(readlink, readlinkargs, readlinkres, 3),
PROC(read, readargs, readres), PROC(read, readargs, readres, 3),
PROC(write, writeargs, writeres), PROC(write, writeargs, writeres, 4),
PROC(create, createargs, createres), PROC(create, createargs, createres, 0),
PROC(mkdir, mkdirargs, createres), PROC(mkdir, mkdirargs, createres, 0),
PROC(symlink, symlinkargs, createres), PROC(symlink, symlinkargs, createres, 0),
PROC(mknod, mknodargs, createres), PROC(mknod, mknodargs, createres, 0),
PROC(remove, diropargs, wccstat), PROC(remove, diropargs, wccstat, 0),
PROC(rmdir, diropargs, wccstat), PROC(rmdir, diropargs, wccstat, 0),
PROC(rename, renameargs, renameres), PROC(rename, renameargs, renameres, 0),
PROC(link, linkargs, linkres), PROC(link, linkargs, linkres, 0),
PROC(readdir, readdirargs, readdirres), PROC(readdir, readdirargs, readdirres, 3),
PROC(readdirplus, readdirargs, readdirres), PROC(readdirplus, readdirargs, readdirres, 3),
PROC(fsstat, fhandle, fsstatres), PROC(fsstat, fhandle, fsstatres, 0),
PROC(fsinfo, fhandle, fsinfores), PROC(fsinfo, fhandle, fsinfores, 0),
PROC(pathconf, fhandle, pathconfres), PROC(pathconf, fhandle, pathconfres, 0),
PROC(commit, commitargs, commitres), PROC(commit, commitargs, commitres, 5),
}; };
struct rpc_version nfs_version3 = { struct rpc_version nfs_version3 = {
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <linux/sunrpc/auth.h> #include <linux/sunrpc/auth.h>
#include <linux/sunrpc/stats.h> #include <linux/sunrpc/stats.h>
#include <linux/sunrpc/xdr.h> #include <linux/sunrpc/xdr.h>
#include <linux/sunrpc/timer.h>
#include <asm/signal.h> #include <asm/signal.h>
/* /*
...@@ -52,6 +53,8 @@ struct rpc_clnt { ...@@ -52,6 +53,8 @@ struct rpc_clnt {
unsigned int cl_flags; /* misc client flags */ unsigned int cl_flags; /* misc client flags */
unsigned long cl_hardmax; /* max hard timeout */ unsigned long cl_hardmax; /* max hard timeout */
struct rpc_rtt cl_rtt; /* RTO estimator data */
struct rpc_portmap cl_pmap; /* port mapping */ struct rpc_portmap cl_pmap; /* port mapping */
struct rpc_wait_queue cl_bindwait; /* waiting on getport() */ struct rpc_wait_queue cl_bindwait; /* waiting on getport() */
...@@ -91,6 +94,7 @@ struct rpc_procinfo { ...@@ -91,6 +94,7 @@ struct rpc_procinfo {
kxdrproc_t p_decode; /* XDR decode function */ kxdrproc_t p_decode; /* XDR decode function */
unsigned int p_bufsiz; /* req. buffer size */ unsigned int p_bufsiz; /* req. buffer size */
unsigned int p_count; /* call count */ unsigned int p_count; /* call count */
unsigned int p_timer; /* Which RTT timer to use */
}; };
#define rpcproc_bufsiz(clnt, proc) ((clnt)->cl_procinfo[proc].p_bufsiz) #define rpcproc_bufsiz(clnt, proc) ((clnt)->cl_procinfo[proc].p_bufsiz)
...@@ -98,6 +102,7 @@ struct rpc_procinfo { ...@@ -98,6 +102,7 @@ struct rpc_procinfo {
#define rpcproc_decode(clnt, proc) ((clnt)->cl_procinfo[proc].p_decode) #define rpcproc_decode(clnt, proc) ((clnt)->cl_procinfo[proc].p_decode)
#define rpcproc_name(clnt, proc) ((clnt)->cl_procinfo[proc].p_procname) #define rpcproc_name(clnt, proc) ((clnt)->cl_procinfo[proc].p_procname)
#define rpcproc_count(clnt, proc) ((clnt)->cl_procinfo[proc].p_count) #define rpcproc_count(clnt, proc) ((clnt)->cl_procinfo[proc].p_count)
#define rpcproc_timer(clnt, proc) ((clnt)->cl_procinfo[proc].p_timer)
#define RPC_CONGESTED(clnt) (RPCXPRT_CONGESTED((clnt)->cl_xprt)) #define RPC_CONGESTED(clnt) (RPCXPRT_CONGESTED((clnt)->cl_xprt))
#define RPC_PEERADDR(clnt) (&(clnt)->cl_xprt->addr) #define RPC_PEERADDR(clnt) (&(clnt)->cl_xprt->addr)
......
/*
* linux/include/linux/sunrpc/timer.h
*
* Declarations for the RPC transport timer.
*
* Copyright (C) 2002 Trond Myklebust <trond.myklebust@fys.uio.no>
*/
#ifndef _LINUX_SUNRPC_TIMER_H
#define _LINUX_SUNRPC_TIMER_H
struct rpc_rtt {
long timeo; /* default timeout value */
long srtt[5]; /* smoothed round trip time << 3 */
long sdrtt[5]; /* soothed medium deviation of RTT */
};
extern void rpc_init_rtt(struct rpc_rtt *rt, long timeo);
extern void rpc_update_rtt(struct rpc_rtt *rt, int timer, long m);
extern long rpc_calc_rto(struct rpc_rtt *rt, int timer);
#endif /* _LINUX_SUNRPC_TIMER_H */
...@@ -98,9 +98,8 @@ struct rpc_rqst { ...@@ -98,9 +98,8 @@ struct rpc_rqst {
u32 rq_bytes_sent; /* Bytes we have sent */ u32 rq_bytes_sent; /* Bytes we have sent */
#ifdef RPC_PROFILE long rq_xtime; /* when transmitted */
unsigned long rq_xtime; /* when transmitted */ int rq_nresend;
#endif
}; };
#define rq_svec rq_snd_buf.head #define rq_svec rq_snd_buf.head
#define rq_slen rq_snd_buf.len #define rq_slen rq_snd_buf.len
......
...@@ -9,7 +9,8 @@ export-objs := sunrpc_syms.o ...@@ -9,7 +9,8 @@ export-objs := sunrpc_syms.o
sunrpc-y := clnt.o xprt.o sched.o \ sunrpc-y := clnt.o xprt.o sched.o \
auth.o auth_null.o auth_unix.o \ auth.o auth_null.o auth_unix.o \
svc.o svcsock.o svcauth.o \ svc.o svcsock.o svcauth.o \
pmap_clnt.o xdr.o sunrpc_syms.o pmap_clnt.o timer.o xdr.o \
sunrpc_syms.o
sunrpc-$(CONFIG_PROC_FS) += stats.o sunrpc-$(CONFIG_PROC_FS) += stats.o
sunrpc-$(CONFIG_SYSCTL) += sysctl.o sunrpc-$(CONFIG_SYSCTL) += sysctl.o
sunrpc-objs := $(sunrpc-y) sunrpc-objs := $(sunrpc-y)
......
...@@ -104,6 +104,8 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname, ...@@ -104,6 +104,8 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname,
if (!clnt->cl_port) if (!clnt->cl_port)
clnt->cl_autobind = 1; clnt->cl_autobind = 1;
rpc_init_rtt(&clnt->cl_rtt, xprt->timeout.to_initval);
if (!rpcauth_create(flavor, clnt)) if (!rpcauth_create(flavor, clnt))
goto out_no_auth; goto out_no_auth;
......
#include <linux/version.h>
#include <linux/types.h>
#include <linux/unistd.h>
#include <linux/sunrpc/clnt.h>
#include <linux/sunrpc/xprt.h>
#include <linux/sunrpc/timer.h>
#define RPC_RTO_MAX (60*HZ)
#define RPC_RTO_INIT (HZ/5)
#define RPC_RTO_MIN (2)
void
rpc_init_rtt(struct rpc_rtt *rt, long timeo)
{
long t = (timeo - RPC_RTO_INIT) << 3;
int i;
rt->timeo = timeo;
if (t < 0)
t = 0;
for (i = 0; i < 5; i++) {
rt->srtt[i] = t;
rt->sdrtt[i] = RPC_RTO_INIT;
}
}
void
rpc_update_rtt(struct rpc_rtt *rt, int timer, long m)
{
long *srtt, *sdrtt;
if (timer-- == 0)
return;
if (m == 0)
m = 1;
srtt = &rt->srtt[timer];
m -= *srtt >> 3;
*srtt += m;
if (m < 0)
m = -m;
sdrtt = &rt->sdrtt[timer];
m -= *sdrtt >> 2;
*sdrtt += m;
/* Set lower bound on the variance */
if (*sdrtt < RPC_RTO_MIN)
*sdrtt = RPC_RTO_MIN;
}
/*
* Estimate rto for an nfs rpc sent via. an unreliable datagram.
* Use the mean and mean deviation of rtt for the appropriate type of rpc
* for the frequent rpcs and a default for the others.
* The justification for doing "other" this way is that these rpcs
* happen so infrequently that timer est. would probably be stale.
* Also, since many of these rpcs are
* non-idempotent, a conservative timeout is desired.
* getattr, lookup,
* read, write, commit - A+4D
* other - timeo
*/
long
rpc_calc_rto(struct rpc_rtt *rt, int timer)
{
long res;
if (timer-- == 0)
return rt->timeo;
res = (rt->srtt[timer] >> 3) + rt->sdrtt[timer];
if (res > RPC_RTO_MAX)
res = RPC_RTO_MAX;
return res;
}
...@@ -480,13 +480,20 @@ xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) ...@@ -480,13 +480,20 @@ xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
* Complete reply received. * Complete reply received.
* The TCP code relies on us to remove the request from xprt->pending. * The TCP code relies on us to remove the request from xprt->pending.
*/ */
static inline void static void
xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
{ {
struct rpc_task *task = req->rq_task; struct rpc_task *task = req->rq_task;
struct rpc_clnt *clnt = task->tk_client;
/* Adjust congestion window */ /* Adjust congestion window */
if (!xprt->nocong) {
xprt_adjust_cwnd(xprt, copied); xprt_adjust_cwnd(xprt, copied);
if (!req->rq_nresend) {
int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
if (timer)
rpc_update_rtt(&clnt->cl_rtt, timer, (long)jiffies - req->rq_xtime);
}
#ifdef RPC_PROFILE #ifdef RPC_PROFILE
/* Profile only reads for now */ /* Profile only reads for now */
...@@ -934,6 +941,7 @@ xprt_timer(struct rpc_task *task) ...@@ -934,6 +941,7 @@ xprt_timer(struct rpc_task *task)
spin_lock(&xprt->sock_lock); spin_lock(&xprt->sock_lock);
if (req->rq_received) if (req->rq_received)
goto out; goto out;
req->rq_nresend++;
xprt_adjust_cwnd(xprt, -ETIMEDOUT); xprt_adjust_cwnd(xprt, -ETIMEDOUT);
dprintk("RPC: %4d xprt_timer (%s request)\n", dprintk("RPC: %4d xprt_timer (%s request)\n",
...@@ -982,15 +990,13 @@ xprt_transmit(struct rpc_task *task) ...@@ -982,15 +990,13 @@ xprt_transmit(struct rpc_task *task)
if (!xprt_lock_write(xprt, task)) if (!xprt_lock_write(xprt, task))
return; return;
#ifdef RPC_PROFILE
req->rq_xtime = jiffies;
#endif
do_xprt_transmit(task); do_xprt_transmit(task);
} }
static void static void
do_xprt_transmit(struct rpc_task *task) do_xprt_transmit(struct rpc_task *task)
{ {
struct rpc_clnt *clnt = task->tk_client;
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt; struct rpc_xprt *xprt = req->rq_xprt;
int status, retry = 0; int status, retry = 0;
...@@ -1002,6 +1008,7 @@ do_xprt_transmit(struct rpc_task *task) ...@@ -1002,6 +1008,7 @@ do_xprt_transmit(struct rpc_task *task)
*/ */
while (1) { while (1) {
xprt_clear_wspace(xprt); xprt_clear_wspace(xprt);
req->rq_xtime = jiffies;
status = xprt_sendmsg(xprt, req); status = xprt_sendmsg(xprt, req);
if (status < 0) if (status < 0)
...@@ -1065,6 +1072,20 @@ do_xprt_transmit(struct rpc_task *task) ...@@ -1065,6 +1072,20 @@ do_xprt_transmit(struct rpc_task *task)
out_receive: out_receive:
dprintk("RPC: %4d xmit complete\n", task->tk_pid); dprintk("RPC: %4d xmit complete\n", task->tk_pid);
/* Set the task's receive timeout value */ /* Set the task's receive timeout value */
if (!xprt->nocong) {
int backoff;
task->tk_timeout = rpc_calc_rto(&clnt->cl_rtt,
rpcproc_timer(clnt, task->tk_msg.rpc_proc));
/* If we are retransmitting, increment the timeout counter */
backoff = req->rq_nresend;
if (backoff) {
if (backoff > 7)
backoff = 7;
task->tk_timeout <<= backoff;
}
if (task->tk_timeout > req->rq_timeout.to_maxval)
task->tk_timeout = req->rq_timeout.to_maxval;
} else
task->tk_timeout = req->rq_timeout.to_current; task->tk_timeout = req->rq_timeout.to_current;
spin_lock_bh(&xprt->sock_lock); spin_lock_bh(&xprt->sock_lock);
if (!req->rq_received) if (!req->rq_received)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment