Commit 01b1df28 authored by Dave Kleikamp's avatar Dave Kleikamp

Merge jfs@jfs.bkbits.net:linux-2.5

into kleikamp.austin.ibm.com:/home/shaggy/bk/jfs-2.5
parents 648044ef 0d84f0ac
VERSION = 2 VERSION = 2
PATCHLEVEL = 5 PATCHLEVEL = 5
SUBLEVEL = 25 SUBLEVEL = 26
EXTRAVERSION = EXTRAVERSION =
# *DOCUMENTATION* # *DOCUMENTATION*
......
...@@ -561,11 +561,10 @@ nlmclt_decode_res(struct rpc_rqst *req, u32 *p, struct nlm_res *resp) ...@@ -561,11 +561,10 @@ nlmclt_decode_res(struct rpc_rqst *req, u32 *p, struct nlm_res *resp)
#define nlmclt_decode_norep NULL #define nlmclt_decode_norep NULL
#define PROC(proc, argtype, restype) \ #define PROC(proc, argtype, restype) \
{ "nlm_" #proc, \ { .p_procname = "nlm_" #proc, \
(kxdrproc_t) nlmclt_encode_##argtype, \ .p_encode = (kxdrproc_t) nlmclt_encode_##argtype, \
(kxdrproc_t) nlmclt_decode_##restype, \ .p_decode = (kxdrproc_t) nlmclt_decode_##restype, \
MAX(NLM_##argtype##_sz, NLM_##restype##_sz) << 2, \ .p_bufsiz = MAX(NLM_##argtype##_sz, NLM_##restype##_sz) << 2 \
0 \
} }
static struct rpc_procinfo nlm_procedures[] = { static struct rpc_procinfo nlm_procedures[] = {
......
...@@ -566,12 +566,11 @@ nlm4clt_decode_res(struct rpc_rqst *req, u32 *p, struct nlm_res *resp) ...@@ -566,12 +566,11 @@ nlm4clt_decode_res(struct rpc_rqst *req, u32 *p, struct nlm_res *resp)
*/ */
#define nlm4clt_decode_norep NULL #define nlm4clt_decode_norep NULL
#define PROC(proc, argtype, restype) \ #define PROC(proc, argtype, restype) \
{ "nlm4_" #proc, \ { .p_procname = "nlm4_" #proc, \
(kxdrproc_t) nlm4clt_encode_##argtype, \ .p_encode = (kxdrproc_t) nlm4clt_encode_##argtype, \
(kxdrproc_t) nlm4clt_decode_##restype, \ .p_decode = (kxdrproc_t) nlm4clt_decode_##restype, \
MAX(NLM4_##argtype##_sz, NLM4_##restype##_sz) << 2, \ .p_bufsiz = MAX(NLM4_##argtype##_sz, NLM4_##restype##_sz) << 2 \
0 \
} }
static struct rpc_procinfo nlm4_procedures[] = { static struct rpc_procinfo nlm4_procedures[] = {
......
...@@ -671,33 +671,32 @@ nfs_stat_to_errno(int stat) ...@@ -671,33 +671,32 @@ nfs_stat_to_errno(int stat)
# define MAX(a, b) (((a) > (b))? (a) : (b)) # define MAX(a, b) (((a) > (b))? (a) : (b))
#endif #endif
#define PROC(proc, argtype, restype) \ #define PROC(proc, argtype, restype, timer) \
{ "nfs_" #proc, \ { .p_procname = "nfs_" #proc, \
(kxdrproc_t) nfs_xdr_##argtype, \ .p_encode = (kxdrproc_t) nfs_xdr_##argtype, \
(kxdrproc_t) nfs_xdr_##restype, \ .p_decode = (kxdrproc_t) nfs_xdr_##restype, \
MAX(NFS_##argtype##_sz,NFS_##restype##_sz) << 2, \ .p_bufsiz = MAX(NFS_##argtype##_sz,NFS_##restype##_sz) << 2, \
0 \ .p_timer = timer \
} }
static struct rpc_procinfo nfs_procedures[18] = { static struct rpc_procinfo nfs_procedures[18] = {
PROC(null, enc_void, dec_void), PROC(null, enc_void, dec_void, 0),
PROC(getattr, fhandle, attrstat), PROC(getattr, fhandle, attrstat, 1),
PROC(setattr, sattrargs, attrstat), PROC(setattr, sattrargs, attrstat, 0),
PROC(root, enc_void, dec_void), PROC(root, enc_void, dec_void, 0),
PROC(lookup, diropargs, diropres), PROC(lookup, diropargs, diropres, 2),
PROC(readlink, readlinkargs, readlinkres), PROC(readlink, readlinkargs, readlinkres, 3),
PROC(read, readargs, readres), PROC(read, readargs, readres, 3),
PROC(writecache, enc_void, dec_void), PROC(writecache, enc_void, dec_void, 0),
PROC(write, writeargs, writeres), PROC(write, writeargs, writeres, 4),
PROC(create, createargs, diropres), PROC(create, createargs, diropres, 0),
PROC(remove, diropargs, stat), PROC(remove, diropargs, stat, 0),
PROC(rename, renameargs, stat), PROC(rename, renameargs, stat, 0),
PROC(link, linkargs, stat), PROC(link, linkargs, stat, 0),
PROC(symlink, symlinkargs, stat), PROC(symlink, symlinkargs, stat, 0),
PROC(mkdir, createargs, diropres), PROC(mkdir, createargs, diropres, 0),
PROC(rmdir, diropargs, stat), PROC(rmdir, diropargs, stat, 0),
PROC(readdir, readdirargs, readdirres), PROC(readdir, readdirargs, readdirres, 3),
PROC(statfs, fhandle, statfsres), PROC(statfs, fhandle, statfsres, 0),
}; };
struct rpc_version nfs_version2 = { struct rpc_version nfs_version2 = {
......
...@@ -988,37 +988,37 @@ nfs3_xdr_commitres(struct rpc_rqst *req, u32 *p, struct nfs_writeres *res) ...@@ -988,37 +988,37 @@ nfs3_xdr_commitres(struct rpc_rqst *req, u32 *p, struct nfs_writeres *res)
# define MAX(a, b) (((a) > (b))? (a) : (b)) # define MAX(a, b) (((a) > (b))? (a) : (b))
#endif #endif
#define PROC(proc, argtype, restype) \ #define PROC(proc, argtype, restype, timer) \
{ "nfs3_" #proc, \ { .p_procname = "nfs3_" #proc, \
(kxdrproc_t) nfs3_xdr_##argtype, \ .p_encode = (kxdrproc_t) nfs3_xdr_##argtype, \
(kxdrproc_t) nfs3_xdr_##restype, \ .p_decode = (kxdrproc_t) nfs3_xdr_##restype, \
MAX(NFS3_##argtype##_sz,NFS3_##restype##_sz) << 2, \ .p_bufsiz = MAX(NFS3_##argtype##_sz,NFS3_##restype##_sz) << 2, \
0 \ .p_timer = timer \
} }
static struct rpc_procinfo nfs3_procedures[22] = { static struct rpc_procinfo nfs3_procedures[22] = {
PROC(null, enc_void, dec_void), PROC(null, enc_void, dec_void, 0),
PROC(getattr, fhandle, attrstat), PROC(getattr, fhandle, attrstat, 1),
PROC(setattr, sattrargs, wccstat), PROC(setattr, sattrargs, wccstat, 0),
PROC(lookup, diropargs, lookupres), PROC(lookup, diropargs, lookupres, 2),
PROC(access, accessargs, accessres), PROC(access, accessargs, accessres, 1),
PROC(readlink, readlinkargs, readlinkres), PROC(readlink, readlinkargs, readlinkres, 3),
PROC(read, readargs, readres), PROC(read, readargs, readres, 3),
PROC(write, writeargs, writeres), PROC(write, writeargs, writeres, 4),
PROC(create, createargs, createres), PROC(create, createargs, createres, 0),
PROC(mkdir, mkdirargs, createres), PROC(mkdir, mkdirargs, createres, 0),
PROC(symlink, symlinkargs, createres), PROC(symlink, symlinkargs, createres, 0),
PROC(mknod, mknodargs, createres), PROC(mknod, mknodargs, createres, 0),
PROC(remove, diropargs, wccstat), PROC(remove, diropargs, wccstat, 0),
PROC(rmdir, diropargs, wccstat), PROC(rmdir, diropargs, wccstat, 0),
PROC(rename, renameargs, renameres), PROC(rename, renameargs, renameres, 0),
PROC(link, linkargs, linkres), PROC(link, linkargs, linkres, 0),
PROC(readdir, readdirargs, readdirres), PROC(readdir, readdirargs, readdirres, 3),
PROC(readdirplus, readdirargs, readdirres), PROC(readdirplus, readdirargs, readdirres, 3),
PROC(fsstat, fhandle, fsstatres), PROC(fsstat, fhandle, fsstatres, 0),
PROC(fsinfo, fhandle, fsinfores), PROC(fsinfo, fhandle, fsinfores, 0),
PROC(pathconf, fhandle, pathconfres), PROC(pathconf, fhandle, pathconfres, 0),
PROC(commit, commitargs, commitres), PROC(commit, commitargs, commitres, 5),
}; };
struct rpc_version nfs_version3 = { struct rpc_version nfs_version3 = {
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <linux/sunrpc/auth.h> #include <linux/sunrpc/auth.h>
#include <linux/sunrpc/stats.h> #include <linux/sunrpc/stats.h>
#include <linux/sunrpc/xdr.h> #include <linux/sunrpc/xdr.h>
#include <linux/sunrpc/timer.h>
#include <asm/signal.h> #include <asm/signal.h>
/* /*
...@@ -52,6 +53,8 @@ struct rpc_clnt { ...@@ -52,6 +53,8 @@ struct rpc_clnt {
unsigned int cl_flags; /* misc client flags */ unsigned int cl_flags; /* misc client flags */
unsigned long cl_hardmax; /* max hard timeout */ unsigned long cl_hardmax; /* max hard timeout */
struct rpc_rtt cl_rtt; /* RTO estimator data */
struct rpc_portmap cl_pmap; /* port mapping */ struct rpc_portmap cl_pmap; /* port mapping */
struct rpc_wait_queue cl_bindwait; /* waiting on getport() */ struct rpc_wait_queue cl_bindwait; /* waiting on getport() */
...@@ -91,6 +94,7 @@ struct rpc_procinfo { ...@@ -91,6 +94,7 @@ struct rpc_procinfo {
kxdrproc_t p_decode; /* XDR decode function */ kxdrproc_t p_decode; /* XDR decode function */
unsigned int p_bufsiz; /* req. buffer size */ unsigned int p_bufsiz; /* req. buffer size */
unsigned int p_count; /* call count */ unsigned int p_count; /* call count */
unsigned int p_timer; /* Which RTT timer to use */
}; };
#define rpcproc_bufsiz(clnt, proc) ((clnt)->cl_procinfo[proc].p_bufsiz) #define rpcproc_bufsiz(clnt, proc) ((clnt)->cl_procinfo[proc].p_bufsiz)
...@@ -98,6 +102,7 @@ struct rpc_procinfo { ...@@ -98,6 +102,7 @@ struct rpc_procinfo {
#define rpcproc_decode(clnt, proc) ((clnt)->cl_procinfo[proc].p_decode) #define rpcproc_decode(clnt, proc) ((clnt)->cl_procinfo[proc].p_decode)
#define rpcproc_name(clnt, proc) ((clnt)->cl_procinfo[proc].p_procname) #define rpcproc_name(clnt, proc) ((clnt)->cl_procinfo[proc].p_procname)
#define rpcproc_count(clnt, proc) ((clnt)->cl_procinfo[proc].p_count) #define rpcproc_count(clnt, proc) ((clnt)->cl_procinfo[proc].p_count)
#define rpcproc_timer(clnt, proc) ((clnt)->cl_procinfo[proc].p_timer)
#define RPC_CONGESTED(clnt) (RPCXPRT_CONGESTED((clnt)->cl_xprt)) #define RPC_CONGESTED(clnt) (RPCXPRT_CONGESTED((clnt)->cl_xprt))
#define RPC_PEERADDR(clnt) (&(clnt)->cl_xprt->addr) #define RPC_PEERADDR(clnt) (&(clnt)->cl_xprt->addr)
......
/*
* linux/include/linux/sunrpc/timer.h
*
* Declarations for the RPC transport timer.
*
* Copyright (C) 2002 Trond Myklebust <trond.myklebust@fys.uio.no>
*/
#ifndef _LINUX_SUNRPC_TIMER_H
#define _LINUX_SUNRPC_TIMER_H
#include <asm/atomic.h>
struct rpc_rtt {
long timeo; /* default timeout value */
long srtt[5]; /* smoothed round trip time << 3 */
long sdrtt[5]; /* soothed medium deviation of RTT */
atomic_t ntimeouts; /* Global count of the number of timeouts */
};
extern void rpc_init_rtt(struct rpc_rtt *rt, long timeo);
extern void rpc_update_rtt(struct rpc_rtt *rt, int timer, long m);
extern long rpc_calc_rto(struct rpc_rtt *rt, int timer);
static inline void rpc_inc_timeo(struct rpc_rtt *rt)
{
atomic_inc(&rt->ntimeouts);
}
static inline void rpc_clear_timeo(struct rpc_rtt *rt)
{
atomic_set(&rt->ntimeouts, 0);
}
static inline int rpc_ntimeo(struct rpc_rtt *rt)
{
return atomic_read(&rt->ntimeouts);
}
#endif /* _LINUX_SUNRPC_TIMER_H */
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
* The transport code maintains an estimate on the maximum number of out- * The transport code maintains an estimate on the maximum number of out-
* standing RPC requests, using a smoothed version of the congestion * standing RPC requests, using a smoothed version of the congestion
* avoidance implemented in 44BSD. This is basically the Van Jacobson * avoidance implemented in 44BSD. This is basically the Van Jacobson
* slow start algorithm: If a retransmit occurs, the congestion window is * congestion algorithm: If a retransmit occurs, the congestion window is
* halved; otherwise, it is incremented by 1/cwnd when * halved; otherwise, it is incremented by 1/cwnd when
* *
* - a reply is received and * - a reply is received and
...@@ -32,15 +32,13 @@ ...@@ -32,15 +32,13 @@
* Note: on machines with low memory we should probably use a smaller * Note: on machines with low memory we should probably use a smaller
* MAXREQS value: At 32 outstanding reqs with 8 megs of RAM, fragment * MAXREQS value: At 32 outstanding reqs with 8 megs of RAM, fragment
* reassembly will frequently run out of memory. * reassembly will frequently run out of memory.
* Come Linux 2.3, we'll handle fragments directly.
*/ */
#define RPC_MAXCONG 16 #define RPC_MAXCONG (16)
#define RPC_MAXREQS (RPC_MAXCONG + 1) #define RPC_MAXREQS RPC_MAXCONG
#define RPC_CWNDSCALE 256 #define RPC_CWNDSCALE (256)
#define RPC_MAXCWND (RPC_MAXCONG * RPC_CWNDSCALE) #define RPC_MAXCWND (RPC_MAXCONG * RPC_CWNDSCALE)
#define RPC_INITCWND RPC_CWNDSCALE #define RPC_INITCWND (RPC_MAXCWND >> 1)
#define RPCXPRT_CONGESTED(xprt) \ #define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
((xprt)->cong >= (xprt)->cwnd)
/* Default timeout values */ /* Default timeout values */
#define RPC_MAX_UDP_TIMEOUT (60*HZ) #define RPC_MAX_UDP_TIMEOUT (60*HZ)
...@@ -83,6 +81,7 @@ struct rpc_rqst { ...@@ -83,6 +81,7 @@ struct rpc_rqst {
struct rpc_task * rq_task; /* RPC task data */ struct rpc_task * rq_task; /* RPC task data */
__u32 rq_xid; /* request XID */ __u32 rq_xid; /* request XID */
struct rpc_rqst * rq_next; /* free list */ struct rpc_rqst * rq_next; /* free list */
int rq_cong; /* has incremented xprt->cong */
int rq_received; /* receive completed */ int rq_received; /* receive completed */
struct list_head rq_list; struct list_head rq_list;
...@@ -98,9 +97,9 @@ struct rpc_rqst { ...@@ -98,9 +97,9 @@ struct rpc_rqst {
u32 rq_bytes_sent; /* Bytes we have sent */ u32 rq_bytes_sent; /* Bytes we have sent */
#ifdef RPC_PROFILE long rq_xtime; /* when transmitted */
unsigned long rq_xtime; /* when transmitted */ int rq_ntimeo;
#endif int rq_nresend;
}; };
#define rq_svec rq_snd_buf.head #define rq_svec rq_snd_buf.head
#define rq_slen rq_snd_buf.len #define rq_slen rq_snd_buf.len
...@@ -122,9 +121,9 @@ struct rpc_xprt { ...@@ -122,9 +121,9 @@ struct rpc_xprt {
unsigned long cong; /* current congestion */ unsigned long cong; /* current congestion */
unsigned long cwnd; /* congestion window */ unsigned long cwnd; /* congestion window */
unsigned long congtime; /* hold cwnd until then */
struct rpc_wait_queue sending; /* requests waiting to send */ struct rpc_wait_queue sending; /* requests waiting to send */
struct rpc_wait_queue resend; /* requests waiting to resend */
struct rpc_wait_queue pending; /* requests in flight */ struct rpc_wait_queue pending; /* requests in flight */
struct rpc_wait_queue backlog; /* waiting for slot */ struct rpc_wait_queue backlog; /* waiting for slot */
struct rpc_rqst * free; /* free slots */ struct rpc_rqst * free; /* free slots */
......
...@@ -9,7 +9,8 @@ export-objs := sunrpc_syms.o ...@@ -9,7 +9,8 @@ export-objs := sunrpc_syms.o
sunrpc-y := clnt.o xprt.o sched.o \ sunrpc-y := clnt.o xprt.o sched.o \
auth.o auth_null.o auth_unix.o \ auth.o auth_null.o auth_unix.o \
svc.o svcsock.o svcauth.o \ svc.o svcsock.o svcauth.o \
pmap_clnt.o xdr.o sunrpc_syms.o pmap_clnt.o timer.o xdr.o \
sunrpc_syms.o
sunrpc-$(CONFIG_PROC_FS) += stats.o sunrpc-$(CONFIG_PROC_FS) += stats.o
sunrpc-$(CONFIG_SYSCTL) += sysctl.o sunrpc-$(CONFIG_SYSCTL) += sysctl.o
sunrpc-objs := $(sunrpc-y) sunrpc-objs := $(sunrpc-y)
......
...@@ -104,6 +104,8 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname, ...@@ -104,6 +104,8 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname,
if (!clnt->cl_port) if (!clnt->cl_port)
clnt->cl_autobind = 1; clnt->cl_autobind = 1;
rpc_init_rtt(&clnt->cl_rtt, xprt->timeout.to_initval);
if (!rpcauth_create(flavor, clnt)) if (!rpcauth_create(flavor, clnt))
goto out_no_auth; goto out_no_auth;
...@@ -669,7 +671,7 @@ call_timeout(struct rpc_task *task) ...@@ -669,7 +671,7 @@ call_timeout(struct rpc_task *task)
rpc_exit(task, -EIO); rpc_exit(task, -EIO);
return; return;
} }
if (clnt->cl_chatty && !(task->tk_flags & RPC_CALL_MAJORSEEN)) { if (clnt->cl_chatty && !(task->tk_flags & RPC_CALL_MAJORSEEN) && rpc_ntimeo(&clnt->cl_rtt) > 7) {
task->tk_flags |= RPC_CALL_MAJORSEEN; task->tk_flags |= RPC_CALL_MAJORSEEN;
if (req) if (req)
printk(KERN_NOTICE "%s: server %s not responding, still trying\n", printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
......
#include <linux/version.h>
#include <linux/types.h>
#include <linux/unistd.h>
#include <linux/sunrpc/clnt.h>
#include <linux/sunrpc/xprt.h>
#include <linux/sunrpc/timer.h>
#define RPC_RTO_MAX (60*HZ)
#define RPC_RTO_INIT (HZ/5)
#define RPC_RTO_MIN (2)
void
rpc_init_rtt(struct rpc_rtt *rt, long timeo)
{
long t = (timeo - RPC_RTO_INIT) << 3;
int i;
rt->timeo = timeo;
if (t < 0)
t = 0;
for (i = 0; i < 5; i++) {
rt->srtt[i] = t;
rt->sdrtt[i] = RPC_RTO_INIT;
}
atomic_set(&rt->ntimeouts, 0);
}
void
rpc_update_rtt(struct rpc_rtt *rt, int timer, long m)
{
long *srtt, *sdrtt;
if (timer-- == 0)
return;
if (m == 0)
m = 1;
srtt = &rt->srtt[timer];
m -= *srtt >> 3;
*srtt += m;
if (m < 0)
m = -m;
sdrtt = &rt->sdrtt[timer];
m -= *sdrtt >> 2;
*sdrtt += m;
/* Set lower bound on the variance */
if (*sdrtt < RPC_RTO_MIN)
*sdrtt = RPC_RTO_MIN;
}
/*
* Estimate rto for an nfs rpc sent via. an unreliable datagram.
* Use the mean and mean deviation of rtt for the appropriate type of rpc
* for the frequent rpcs and a default for the others.
* The justification for doing "other" this way is that these rpcs
* happen so infrequently that timer est. would probably be stale.
* Also, since many of these rpcs are
* non-idempotent, a conservative timeout is desired.
* getattr, lookup,
* read, write, commit - A+4D
* other - timeo
*/
long
rpc_calc_rto(struct rpc_rtt *rt, int timer)
{
long res;
if (timer-- == 0)
return rt->timeo;
res = (rt->srtt[timer] >> 3) + rt->sdrtt[timer];
if (res > RPC_RTO_MAX)
res = RPC_RTO_MAX;
return res;
}
...@@ -242,11 +242,11 @@ void xdr_kunmap(struct xdr_buf *xdr, size_t base) ...@@ -242,11 +242,11 @@ void xdr_kunmap(struct xdr_buf *xdr, size_t base)
return; return;
if (base || xdr->page_base) { if (base || xdr->page_base) {
pglen -= base; pglen -= base;
base += xdr->page_base;
ppage += base >> PAGE_CACHE_SHIFT; ppage += base >> PAGE_CACHE_SHIFT;
} }
for (;;) { for (;;) {
flush_dcache_page(*ppage); flush_dcache_page(*ppage);
flush_page_to_ram(*ppage);
kunmap(*ppage); kunmap(*ppage);
if (pglen <= PAGE_CACHE_SIZE) if (pglen <= PAGE_CACHE_SIZE)
break; break;
......
...@@ -77,6 +77,8 @@ ...@@ -77,6 +77,8 @@
# define RPCDBG_FACILITY RPCDBG_XPRT # define RPCDBG_FACILITY RPCDBG_XPRT
#endif #endif
#define XPRT_MAX_BACKOFF (8)
/* /*
* Local functions * Local functions
*/ */
...@@ -87,6 +89,7 @@ static void xprt_disconnect(struct rpc_xprt *); ...@@ -87,6 +89,7 @@ static void xprt_disconnect(struct rpc_xprt *);
static void xprt_reconn_status(struct rpc_task *task); static void xprt_reconn_status(struct rpc_task *task);
static struct socket *xprt_create_socket(int, struct rpc_timeout *); static struct socket *xprt_create_socket(int, struct rpc_timeout *);
static int xprt_bind_socket(struct rpc_xprt *, struct socket *); static int xprt_bind_socket(struct rpc_xprt *, struct socket *);
static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
#ifdef RPC_DEBUG_DATA #ifdef RPC_DEBUG_DATA
/* /*
...@@ -138,31 +141,60 @@ xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -138,31 +141,60 @@ xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
{ {
int retval; int retval;
spin_lock_bh(&xprt->sock_lock); spin_lock_bh(&xprt->sock_lock);
if (!xprt->snd_task) if (!xprt->snd_task) {
xprt->snd_task = task; if (xprt->nocong || __xprt_get_cong(xprt, task))
else if (xprt->snd_task != task) { xprt->snd_task = task;
}
if (xprt->snd_task != task) {
dprintk("RPC: %4d TCP write queue full (task %d)\n", dprintk("RPC: %4d TCP write queue full (task %d)\n",
task->tk_pid, xprt->snd_task->tk_pid); task->tk_pid, xprt->snd_task->tk_pid);
task->tk_timeout = 0; task->tk_timeout = 0;
task->tk_status = -EAGAIN; task->tk_status = -EAGAIN;
rpc_sleep_on(&xprt->sending, task, NULL, NULL); if (task->tk_rqstp->rq_nresend)
rpc_sleep_on(&xprt->resend, task, NULL, NULL);
else
rpc_sleep_on(&xprt->sending, task, NULL, NULL);
} }
retval = xprt->snd_task == task; retval = xprt->snd_task == task;
spin_unlock_bh(&xprt->sock_lock); spin_unlock_bh(&xprt->sock_lock);
return retval; return retval;
} }
static void
__xprt_lock_write_next(struct rpc_xprt *xprt)
{
struct rpc_task *task;
if (xprt->snd_task)
return;
if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
return;
task = rpc_wake_up_next(&xprt->resend);
if (!task) {
task = rpc_wake_up_next(&xprt->sending);
if (!task)
return;
}
if (xprt->nocong || __xprt_get_cong(xprt, task))
xprt->snd_task = task;
}
/* /*
* Releases the socket for use by other requests. * Releases the socket for use by other requests.
*/ */
static void static void
__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
{
if (xprt->snd_task == task)
xprt->snd_task = NULL;
__xprt_lock_write_next(xprt);
}
static inline void
xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
{ {
spin_lock_bh(&xprt->sock_lock); spin_lock_bh(&xprt->sock_lock);
if (xprt->snd_task == task) { __xprt_release_write(xprt, task);
xprt->snd_task = NULL;
rpc_wake_up_next(&xprt->sending);
}
spin_unlock_bh(&xprt->sock_lock); spin_unlock_bh(&xprt->sock_lock);
} }
...@@ -233,6 +265,40 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) ...@@ -233,6 +265,40 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
return result; return result;
} }
/*
* Van Jacobson congestion avoidance. Check if the congestion window
* overflowed. Put the task to sleep if this is the case.
*/
static int
__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
if (req->rq_cong)
return 1;
dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
task->tk_pid, xprt->cong, xprt->cwnd);
if (RPCXPRT_CONGESTED(xprt))
return 0;
req->rq_cong = 1;
xprt->cong += RPC_CWNDSCALE;
return 1;
}
/*
* Adjust the congestion window, and wake up the next task
* that has been sleeping due to congestion
*/
static void
__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
if (!req->rq_cong)
return;
req->rq_cong = 0;
xprt->cong -= RPC_CWNDSCALE;
__xprt_lock_write_next(xprt);
}
/* /*
* Adjust RPC congestion window * Adjust RPC congestion window
* We use a time-smoothed congestion estimator to avoid heavy oscillation. * We use a time-smoothed congestion estimator to avoid heavy oscillation.
...@@ -242,40 +308,22 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) ...@@ -242,40 +308,22 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
{ {
unsigned long cwnd; unsigned long cwnd;
if (xprt->nocong)
return;
/*
* Note: we're in a BH context
*/
spin_lock(&xprt->xprt_lock);
cwnd = xprt->cwnd; cwnd = xprt->cwnd;
if (result >= 0) { if (result >= 0 && xprt->cong <= cwnd) {
if (xprt->cong < cwnd || time_before(jiffies, xprt->congtime))
goto out;
/* The (cwnd >> 1) term makes sure /* The (cwnd >> 1) term makes sure
* the result gets rounded properly. */ * the result gets rounded properly. */
cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
if (cwnd > RPC_MAXCWND) if (cwnd > RPC_MAXCWND)
cwnd = RPC_MAXCWND; cwnd = RPC_MAXCWND;
else __xprt_lock_write_next(xprt);
pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
xprt->congtime = jiffies + ((cwnd * HZ) << 2) / RPC_CWNDSCALE;
dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
"time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
(xprt->congtime-jiffies)*1000/HZ);
} else if (result == -ETIMEDOUT) { } else if (result == -ETIMEDOUT) {
if ((cwnd >>= 1) < RPC_CWNDSCALE) cwnd >>= 1;
if (cwnd < RPC_CWNDSCALE)
cwnd = RPC_CWNDSCALE; cwnd = RPC_CWNDSCALE;
xprt->congtime = jiffies + ((cwnd * HZ) << 3) / RPC_CWNDSCALE;
dprintk("RPC: cong %ld, cwnd was %ld, now %ld, "
"time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
(xprt->congtime-jiffies)*1000/HZ);
pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
} }
dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
xprt->cong, xprt->cwnd, cwnd);
xprt->cwnd = cwnd; xprt->cwnd = cwnd;
out:
spin_unlock(&xprt->xprt_lock);
} }
/* /*
...@@ -427,7 +475,7 @@ xprt_reconnect(struct rpc_task *task) ...@@ -427,7 +475,7 @@ xprt_reconnect(struct rpc_task *task)
/* if the socket is already closing, delay 5 secs */ /* if the socket is already closing, delay 5 secs */
if ((1<<inet->state) & ~(TCP_SYN_SENT|TCP_SYN_RECV)) if ((1<<inet->state) & ~(TCP_SYN_SENT|TCP_SYN_RECV))
task->tk_timeout = 5*HZ; task->tk_timeout = 5*HZ;
rpc_sleep_on(&xprt->sending, task, xprt_reconn_status, NULL); rpc_sleep_on(&xprt->pending, task, xprt_reconn_status, NULL);
release_sock(inet); release_sock(inet);
return; return;
} }
...@@ -480,13 +528,23 @@ xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) ...@@ -480,13 +528,23 @@ xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
* Complete reply received. * Complete reply received.
* The TCP code relies on us to remove the request from xprt->pending. * The TCP code relies on us to remove the request from xprt->pending.
*/ */
static inline void static void
xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
{ {
struct rpc_task *task = req->rq_task; struct rpc_task *task = req->rq_task;
struct rpc_clnt *clnt = task->tk_client;
/* Adjust congestion window */ /* Adjust congestion window */
xprt_adjust_cwnd(xprt, copied); if (!xprt->nocong) {
xprt_adjust_cwnd(xprt, copied);
__xprt_put_cong(xprt, req);
if (!req->rq_nresend) {
int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
if (timer)
rpc_update_rtt(&clnt->cl_rtt, timer, (long)jiffies - req->rq_xtime);
}
rpc_clear_timeo(&clnt->cl_rtt);
}
#ifdef RPC_PROFILE #ifdef RPC_PROFILE
/* Profile only reads for now */ /* Profile only reads for now */
...@@ -872,7 +930,7 @@ tcp_state_change(struct sock *sk) ...@@ -872,7 +930,7 @@ tcp_state_change(struct sock *sk)
xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID; xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
spin_lock(&xprt->sock_lock); spin_lock(&xprt->sock_lock);
if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending) if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
rpc_wake_up_task(xprt->snd_task); rpc_wake_up_task(xprt->snd_task);
spin_unlock(&xprt->sock_lock); spin_unlock(&xprt->sock_lock);
break; break;
...@@ -909,7 +967,7 @@ xprt_write_space(struct sock *sk) ...@@ -909,7 +967,7 @@ xprt_write_space(struct sock *sk)
if (!xprt_test_and_set_wspace(xprt)) { if (!xprt_test_and_set_wspace(xprt)) {
spin_lock(&xprt->sock_lock); spin_lock(&xprt->sock_lock);
if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending) if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
rpc_wake_up_task(xprt->snd_task); rpc_wake_up_task(xprt->snd_task);
spin_unlock(&xprt->sock_lock); spin_unlock(&xprt->sock_lock);
} }
...@@ -922,6 +980,21 @@ xprt_write_space(struct sock *sk) ...@@ -922,6 +980,21 @@ xprt_write_space(struct sock *sk)
} }
} }
/*
* Exponential backoff for UDP retries
*/
static inline int
xprt_expbackoff(struct rpc_task *task, struct rpc_rqst *req)
{
int backoff;
req->rq_ntimeo++;
backoff = min(rpc_ntimeo(&task->tk_client->cl_rtt), XPRT_MAX_BACKOFF);
if (req->rq_ntimeo < (1 << backoff))
return 1;
return 0;
}
/* /*
* RPC receive timeout handler. * RPC receive timeout handler.
*/ */
...@@ -934,7 +1007,17 @@ xprt_timer(struct rpc_task *task) ...@@ -934,7 +1007,17 @@ xprt_timer(struct rpc_task *task)
spin_lock(&xprt->sock_lock); spin_lock(&xprt->sock_lock);
if (req->rq_received) if (req->rq_received)
goto out; goto out;
xprt_adjust_cwnd(xprt, -ETIMEDOUT);
if (!xprt->nocong) {
if (xprt_expbackoff(task, req)) {
rpc_add_timer(task, xprt_timer);
goto out_unlock;
}
rpc_inc_timeo(&task->tk_client->cl_rtt);
xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
__xprt_put_cong(xprt, req);
}
req->rq_nresend++;
dprintk("RPC: %4d xprt_timer (%s request)\n", dprintk("RPC: %4d xprt_timer (%s request)\n",
task->tk_pid, req ? "pending" : "backlogged"); task->tk_pid, req ? "pending" : "backlogged");
...@@ -943,6 +1026,7 @@ xprt_timer(struct rpc_task *task) ...@@ -943,6 +1026,7 @@ xprt_timer(struct rpc_task *task)
out: out:
task->tk_timeout = 0; task->tk_timeout = 0;
rpc_wake_up_task(task); rpc_wake_up_task(task);
out_unlock:
spin_unlock(&xprt->sock_lock); spin_unlock(&xprt->sock_lock);
} }
...@@ -982,15 +1066,13 @@ xprt_transmit(struct rpc_task *task) ...@@ -982,15 +1066,13 @@ xprt_transmit(struct rpc_task *task)
if (!xprt_lock_write(xprt, task)) if (!xprt_lock_write(xprt, task))
return; return;
#ifdef RPC_PROFILE
req->rq_xtime = jiffies;
#endif
do_xprt_transmit(task); do_xprt_transmit(task);
} }
static void static void
do_xprt_transmit(struct rpc_task *task) do_xprt_transmit(struct rpc_task *task)
{ {
struct rpc_clnt *clnt = task->tk_client;
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt; struct rpc_xprt *xprt = req->rq_xprt;
int status, retry = 0; int status, retry = 0;
...@@ -1002,6 +1084,7 @@ do_xprt_transmit(struct rpc_task *task) ...@@ -1002,6 +1084,7 @@ do_xprt_transmit(struct rpc_task *task)
*/ */
while (1) { while (1) {
xprt_clear_wspace(xprt); xprt_clear_wspace(xprt);
req->rq_xtime = jiffies;
status = xprt_sendmsg(xprt, req); status = xprt_sendmsg(xprt, req);
if (status < 0) if (status < 0)
...@@ -1043,7 +1126,7 @@ do_xprt_transmit(struct rpc_task *task) ...@@ -1043,7 +1126,7 @@ do_xprt_transmit(struct rpc_task *task)
spin_lock_bh(&xprt->sock_lock); spin_lock_bh(&xprt->sock_lock);
if (!xprt_wspace(xprt)) { if (!xprt_wspace(xprt)) {
task->tk_timeout = req->rq_timeout.to_current; task->tk_timeout = req->rq_timeout.to_current;
rpc_sleep_on(&xprt->sending, task, NULL, NULL); rpc_sleep_on(&xprt->pending, task, NULL, NULL);
} }
spin_unlock_bh(&xprt->sock_lock); spin_unlock_bh(&xprt->sock_lock);
return; return;
...@@ -1059,19 +1142,29 @@ do_xprt_transmit(struct rpc_task *task) ...@@ -1059,19 +1142,29 @@ do_xprt_transmit(struct rpc_task *task)
if (xprt->stream) if (xprt->stream)
xprt_disconnect(xprt); xprt_disconnect(xprt);
req->rq_bytes_sent = 0; req->rq_bytes_sent = 0;
goto out_release;
} }
out_release:
spin_lock_bh(&xprt->sock_lock);
__xprt_release_write(xprt, task);
__xprt_put_cong(xprt, req);
spin_unlock_bh(&xprt->sock_lock);
return;
out_receive: out_receive:
dprintk("RPC: %4d xmit complete\n", task->tk_pid); dprintk("RPC: %4d xmit complete\n", task->tk_pid);
/* Set the task's receive timeout value */ /* Set the task's receive timeout value */
task->tk_timeout = req->rq_timeout.to_current; if (!xprt->nocong) {
task->tk_timeout = rpc_calc_rto(&clnt->cl_rtt,
rpcproc_timer(clnt, task->tk_msg.rpc_proc));
req->rq_ntimeo = 0;
if (task->tk_timeout > req->rq_timeout.to_maxval)
task->tk_timeout = req->rq_timeout.to_maxval;
} else
task->tk_timeout = req->rq_timeout.to_current;
spin_lock_bh(&xprt->sock_lock); spin_lock_bh(&xprt->sock_lock);
if (!req->rq_received) if (!req->rq_received)
rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
__xprt_release_write(xprt, task);
spin_unlock_bh(&xprt->sock_lock); spin_unlock_bh(&xprt->sock_lock);
out_release:
xprt_release_write(xprt, task);
} }
/* /*
...@@ -1086,9 +1179,7 @@ xprt_reserve(struct rpc_task *task) ...@@ -1086,9 +1179,7 @@ xprt_reserve(struct rpc_task *task)
if (task->tk_rqstp) if (task->tk_rqstp)
return 0; return 0;
dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n", spin_lock(&xprt->xprt_lock);
task->tk_pid, xprt->cong, xprt->cwnd);
spin_lock_bh(&xprt->xprt_lock);
xprt_reserve_status(task); xprt_reserve_status(task);
if (task->tk_rqstp) { if (task->tk_rqstp) {
task->tk_timeout = 0; task->tk_timeout = 0;
...@@ -1099,7 +1190,7 @@ xprt_reserve(struct rpc_task *task) ...@@ -1099,7 +1190,7 @@ xprt_reserve(struct rpc_task *task)
task->tk_status = -EAGAIN; task->tk_status = -EAGAIN;
rpc_sleep_on(&xprt->backlog, task, NULL, NULL); rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
} }
spin_unlock_bh(&xprt->xprt_lock); spin_unlock(&xprt->xprt_lock);
dprintk("RPC: %4d xprt_reserve returns %d\n", dprintk("RPC: %4d xprt_reserve returns %d\n",
task->tk_pid, task->tk_status); task->tk_pid, task->tk_status);
return task->tk_status; return task->tk_status;
...@@ -1121,18 +1212,13 @@ xprt_reserve_status(struct rpc_task *task) ...@@ -1121,18 +1212,13 @@ xprt_reserve_status(struct rpc_task *task)
} else if (task->tk_rqstp) { } else if (task->tk_rqstp) {
/* We've already been given a request slot: NOP */ /* We've already been given a request slot: NOP */
} else { } else {
if (RPCXPRT_CONGESTED(xprt) || !(req = xprt->free)) if (!(req = xprt->free))
goto out_nofree; goto out_nofree;
/* OK: There's room for us. Grab a free slot and bump /* OK: There's room for us. Grab a free slot */
* congestion value */
xprt->free = req->rq_next; xprt->free = req->rq_next;
req->rq_next = NULL; req->rq_next = NULL;
xprt->cong += RPC_CWNDSCALE;
task->tk_rqstp = req; task->tk_rqstp = req;
xprt_request_init(task, xprt); xprt_request_init(task, xprt);
if (xprt->free)
xprt_clear_backlog(xprt);
} }
return; return;
...@@ -1176,14 +1262,11 @@ xprt_release(struct rpc_task *task) ...@@ -1176,14 +1262,11 @@ xprt_release(struct rpc_task *task)
struct rpc_xprt *xprt = task->tk_xprt; struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req; struct rpc_rqst *req;
if (xprt->snd_task == task) {
if (xprt->stream)
xprt_disconnect(xprt);
xprt_release_write(xprt, task);
}
if (!(req = task->tk_rqstp)) if (!(req = task->tk_rqstp))
return; return;
spin_lock_bh(&xprt->sock_lock); spin_lock_bh(&xprt->sock_lock);
__xprt_release_write(xprt, task);
__xprt_put_cong(xprt, req);
if (!list_empty(&req->rq_list)) if (!list_empty(&req->rq_list))
list_del(&req->rq_list); list_del(&req->rq_list);
spin_unlock_bh(&xprt->sock_lock); spin_unlock_bh(&xprt->sock_lock);
...@@ -1192,15 +1275,12 @@ xprt_release(struct rpc_task *task) ...@@ -1192,15 +1275,12 @@ xprt_release(struct rpc_task *task)
dprintk("RPC: %4d release request %p\n", task->tk_pid, req); dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
spin_lock_bh(&xprt->xprt_lock); spin_lock(&xprt->xprt_lock);
req->rq_next = xprt->free; req->rq_next = xprt->free;
xprt->free = req; xprt->free = req;
/* Decrease congestion value. */
xprt->cong -= RPC_CWNDSCALE;
xprt_clear_backlog(xprt); xprt_clear_backlog(xprt);
spin_unlock_bh(&xprt->xprt_lock); spin_unlock(&xprt->xprt_lock);
} }
/* /*
...@@ -1256,7 +1336,6 @@ xprt_setup(struct socket *sock, int proto, ...@@ -1256,7 +1336,6 @@ xprt_setup(struct socket *sock, int proto,
xprt->nocong = 1; xprt->nocong = 1;
} else } else
xprt->cwnd = RPC_INITCWND; xprt->cwnd = RPC_INITCWND;
xprt->congtime = jiffies;
spin_lock_init(&xprt->sock_lock); spin_lock_init(&xprt->sock_lock);
spin_lock_init(&xprt->xprt_lock); spin_lock_init(&xprt->xprt_lock);
init_waitqueue_head(&xprt->cong_wait); init_waitqueue_head(&xprt->cong_wait);
...@@ -1273,6 +1352,7 @@ xprt_setup(struct socket *sock, int proto, ...@@ -1273,6 +1352,7 @@ xprt_setup(struct socket *sock, int proto,
INIT_RPC_WAITQ(&xprt->pending, "xprt_pending"); INIT_RPC_WAITQ(&xprt->pending, "xprt_pending");
INIT_RPC_WAITQ(&xprt->sending, "xprt_sending"); INIT_RPC_WAITQ(&xprt->sending, "xprt_sending");
INIT_RPC_WAITQ(&xprt->resend, "xprt_resend");
INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog"); INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog");
/* initialize free list */ /* initialize free list */
...@@ -1404,6 +1484,7 @@ xprt_shutdown(struct rpc_xprt *xprt) ...@@ -1404,6 +1484,7 @@ xprt_shutdown(struct rpc_xprt *xprt)
{ {
xprt->shutdown = 1; xprt->shutdown = 1;
rpc_wake_up(&xprt->sending); rpc_wake_up(&xprt->sending);
rpc_wake_up(&xprt->resend);
rpc_wake_up(&xprt->pending); rpc_wake_up(&xprt->pending);
rpc_wake_up(&xprt->backlog); rpc_wake_up(&xprt->backlog);
if (waitqueue_active(&xprt->cong_wait)) if (waitqueue_active(&xprt->cong_wait))
...@@ -1415,8 +1496,6 @@ xprt_shutdown(struct rpc_xprt *xprt) ...@@ -1415,8 +1496,6 @@ xprt_shutdown(struct rpc_xprt *xprt)
*/ */
int int
xprt_clear_backlog(struct rpc_xprt *xprt) { xprt_clear_backlog(struct rpc_xprt *xprt) {
if (RPCXPRT_CONGESTED(xprt))
return 0;
rpc_wake_up_next(&xprt->backlog); rpc_wake_up_next(&xprt->backlog);
if (waitqueue_active(&xprt->cong_wait)) if (waitqueue_active(&xprt->cong_wait))
wake_up(&xprt->cong_wait); wake_up(&xprt->cong_wait);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment