Commit 1eedbd78 authored by Trond Myklebust's avatar Trond Myklebust

[PATCH] 2.5.25 Clean up RPC receive code

  Divorces task xid<->request slot mapping from the internals of the
rpc_waitqueue xprt->pending. Instead xprt_lookup_rqst() is made to
search a dedicated list (xprt->recv) on which the request slot is
placed immediately after being allocated to a task. The new queue is
protected using the spinlock xprt->sock_lock rather than the generic
RPC task lock.

  Both udp_data_ready() and tcp_data_ready() (well tcp_read_request()
actually) now need to protect against the request being removed from
the xprt->recv list while they copy the RPC reply data from the skb.
On the other hand, they no longer need to worry about the task
disappearing from xprt->pending. This means that rpc_lock_task() hack
can be replaced by the spinlock xprt->sock_lock.
parent 45bde054
...@@ -83,7 +83,9 @@ struct rpc_rqst { ...@@ -83,7 +83,9 @@ struct rpc_rqst {
struct rpc_task * rq_task; /* RPC task data */ struct rpc_task * rq_task; /* RPC task data */
__u32 rq_xid; /* request XID */ __u32 rq_xid; /* request XID */
struct rpc_rqst * rq_next; /* free list */ struct rpc_rqst * rq_next; /* free list */
volatile unsigned char rq_received : 1;/* receive completed */ int rq_received; /* receive completed */
struct list_head rq_list;
/* /*
* For authentication (e.g. auth_des) * For authentication (e.g. auth_des)
...@@ -149,6 +151,8 @@ struct rpc_xprt { ...@@ -149,6 +151,8 @@ struct rpc_xprt {
spinlock_t xprt_lock; /* lock xprt info */ spinlock_t xprt_lock; /* lock xprt info */
struct rpc_task * snd_task; /* Task blocked in send */ struct rpc_task * snd_task; /* Task blocked in send */
struct list_head recv;
void (*old_data_ready)(struct sock *, int); void (*old_data_ready)(struct sock *, int);
void (*old_state_change)(struct sock *); void (*old_state_change)(struct sock *);
......
...@@ -595,13 +595,16 @@ call_status(struct rpc_task *task) ...@@ -595,13 +595,16 @@ call_status(struct rpc_task *task)
dprintk("RPC: %4d call_status (status %d)\n", dprintk("RPC: %4d call_status (status %d)\n",
task->tk_pid, task->tk_status); task->tk_pid, task->tk_status);
req = task->tk_rqstp;
if (req->rq_received != 0)
status = req->rq_received;
if (status >= 0) { if (status >= 0) {
req->rq_received = 0;
task->tk_action = call_decode; task->tk_action = call_decode;
return; return;
} }
task->tk_status = 0; task->tk_status = 0;
req = task->tk_rqstp;
switch(status) { switch(status) {
case -ETIMEDOUT: case -ETIMEDOUT:
task->tk_action = call_timeout; task->tk_action = call_timeout;
......
...@@ -68,8 +68,6 @@ ...@@ -68,8 +68,6 @@
#include <asm/uaccess.h> #include <asm/uaccess.h>
extern spinlock_t rpc_queue_lock;
/* /*
* Local variables * Local variables
*/ */
...@@ -465,20 +463,16 @@ xprt_reconn_status(struct rpc_task *task) ...@@ -465,20 +463,16 @@ xprt_reconn_status(struct rpc_task *task)
static inline struct rpc_rqst * static inline struct rpc_rqst *
xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
{ {
struct rpc_rqst *req; struct list_head *pos;
struct list_head *le; struct rpc_rqst *req = NULL;
struct rpc_task *task;
spin_lock_bh(&rpc_queue_lock); list_for_each(pos, &xprt->recv) {
task_for_each(task, le, &xprt->pending.tasks) struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
if ((req = task->tk_rqstp) && req->rq_xid == xid) if (entry->rq_xid == xid) {
goto out; req = entry;
dprintk("RPC: unknown XID %08x in reply.\n", xid); break;
req = NULL; }
out: }
if (req && !__rpc_lock_task(req->rq_task))
req = NULL;
spin_unlock_bh(&rpc_queue_lock);
return req; return req;
} }
...@@ -515,7 +509,7 @@ xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) ...@@ -515,7 +509,7 @@ xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied); dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
task->tk_status = copied; task->tk_status = copied;
req->rq_received = 1; req->rq_received = copied;
/* ... and wake up the process. */ /* ... and wake up the process. */
rpc_wake_up_task(task); rpc_wake_up_task(task);
...@@ -613,9 +607,10 @@ udp_data_ready(struct sock *sk, int len) ...@@ -613,9 +607,10 @@ udp_data_ready(struct sock *sk, int len)
} }
/* Look up and lock the request corresponding to the given XID */ /* Look up and lock the request corresponding to the given XID */
spin_lock(&xprt->sock_lock);
rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr))); rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
if (!rovr) if (!rovr)
goto dropit; goto out_unlock;
task = rovr->rq_task; task = rovr->rq_task;
dprintk("RPC: %4d received reply\n", task->tk_pid); dprintk("RPC: %4d received reply\n", task->tk_pid);
...@@ -635,8 +630,7 @@ udp_data_ready(struct sock *sk, int len) ...@@ -635,8 +630,7 @@ udp_data_ready(struct sock *sk, int len)
xprt_complete_rqst(xprt, rovr, copied); xprt_complete_rqst(xprt, rovr, copied);
out_unlock: out_unlock:
rpc_unlock_task(task); spin_unlock(&xprt->sock_lock);
dropit: dropit:
skb_free_datagram(sk, skb); skb_free_datagram(sk, skb);
out: out:
...@@ -738,11 +732,13 @@ tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc) ...@@ -738,11 +732,13 @@ tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
size_t len; size_t len;
/* Find and lock the request corresponding to this xid */ /* Find and lock the request corresponding to this xid */
spin_lock(&xprt->sock_lock);
req = xprt_lookup_rqst(xprt, xprt->tcp_xid); req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
if (!req) { if (!req) {
xprt->tcp_flags &= ~XPRT_COPY_DATA; xprt->tcp_flags &= ~XPRT_COPY_DATA;
dprintk("RPC: XID %08x request not found!\n", dprintk("RPC: XID %08x request not found!\n",
xprt->tcp_xid); xprt->tcp_xid);
spin_unlock(&xprt->sock_lock);
return; return;
} }
...@@ -776,7 +772,7 @@ tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc) ...@@ -776,7 +772,7 @@ tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
req->rq_task->tk_pid); req->rq_task->tk_pid);
xprt_complete_rqst(xprt, req, xprt->tcp_copied); xprt_complete_rqst(xprt, req, xprt->tcp_copied);
} }
rpc_unlock_task(req->rq_task); spin_unlock(&xprt->sock_lock);
tcp_check_recm(xprt); tcp_check_recm(xprt);
} }
...@@ -933,16 +929,21 @@ static void ...@@ -933,16 +929,21 @@ static void
xprt_timer(struct rpc_task *task) xprt_timer(struct rpc_task *task)
{ {
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
if (req) spin_lock(&xprt->sock_lock);
xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT); if (req->rq_received)
goto out;
xprt_adjust_cwnd(xprt, -ETIMEDOUT);
dprintk("RPC: %4d xprt_timer (%s request)\n", dprintk("RPC: %4d xprt_timer (%s request)\n",
task->tk_pid, req ? "pending" : "backlogged"); task->tk_pid, req ? "pending" : "backlogged");
task->tk_status = -ETIMEDOUT; task->tk_status = -ETIMEDOUT;
out:
task->tk_timeout = 0; task->tk_timeout = 0;
rpc_wake_up_task(task); rpc_wake_up_task(task);
spin_unlock(&xprt->sock_lock);
} }
/* /*
...@@ -995,14 +996,6 @@ do_xprt_transmit(struct rpc_task *task) ...@@ -995,14 +996,6 @@ do_xprt_transmit(struct rpc_task *task)
int status, retry = 0; int status, retry = 0;
/* For fast networks/servers we have to put the request on
* the pending list now:
* Note that we don't want the task timing out during the
* call to xprt_sendmsg(), so we initially disable the timeout,
* and then reset it later...
*/
xprt_receive(task);
/* Continue transmitting the packet/record. We must be careful /* Continue transmitting the packet/record. We must be careful
* to cope with writespace callbacks arriving _after_ we have * to cope with writespace callbacks arriving _after_ we have
* called xprt_sendmsg(). * called xprt_sendmsg().
...@@ -1034,15 +1027,11 @@ do_xprt_transmit(struct rpc_task *task) ...@@ -1034,15 +1027,11 @@ do_xprt_transmit(struct rpc_task *task)
if (retry++ > 50) if (retry++ > 50)
break; break;
} }
rpc_unlock_task(task);
/* Note: at this point, task->tk_sleeping has not yet been set, /* Note: at this point, task->tk_sleeping has not yet been set,
* hence there is no danger of the waking up task being put on * hence there is no danger of the waking up task being put on
* schedq, and being picked up by a parallel run of rpciod(). * schedq, and being picked up by a parallel run of rpciod().
*/ */
rpc_wake_up_task(task);
if (!RPC_IS_RUNNING(task))
goto out_release;
if (req->rq_received) if (req->rq_received)
goto out_release; goto out_release;
...@@ -1077,30 +1066,14 @@ do_xprt_transmit(struct rpc_task *task) ...@@ -1077,30 +1066,14 @@ do_xprt_transmit(struct rpc_task *task)
dprintk("RPC: %4d xmit complete\n", task->tk_pid); dprintk("RPC: %4d xmit complete\n", task->tk_pid);
/* Set the task's receive timeout value */ /* Set the task's receive timeout value */
task->tk_timeout = req->rq_timeout.to_current; task->tk_timeout = req->rq_timeout.to_current;
rpc_add_timer(task, xprt_timer); spin_lock_bh(&xprt->sock_lock);
rpc_unlock_task(task); if (!req->rq_received)
rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
spin_unlock_bh(&xprt->sock_lock);
out_release: out_release:
xprt_release_write(xprt, task); xprt_release_write(xprt, task);
} }
/*
* Queue the task for a reply to our call.
* When the callback is invoked, the congestion window should have
* been updated already.
*/
void
xprt_receive(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
dprintk("RPC: %4d xprt_receive\n", task->tk_pid);
req->rq_received = 0;
task->tk_timeout = 0;
rpc_sleep_locked(&xprt->pending, task, NULL, NULL);
}
/* /*
* Reserve an RPC call slot. * Reserve an RPC call slot.
*/ */
...@@ -1188,6 +1161,10 @@ xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) ...@@ -1188,6 +1161,10 @@ xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
req->rq_xid = xid++; req->rq_xid = xid++;
if (!xid) if (!xid)
xid++; xid++;
INIT_LIST_HEAD(&req->rq_list);
spin_lock_bh(&xprt->sock_lock);
list_add_tail(&req->rq_list, &xprt->recv);
spin_unlock_bh(&xprt->sock_lock);
} }
/* /*
...@@ -1206,6 +1183,10 @@ xprt_release(struct rpc_task *task) ...@@ -1206,6 +1183,10 @@ xprt_release(struct rpc_task *task)
} }
if (!(req = task->tk_rqstp)) if (!(req = task->tk_rqstp))
return; return;
spin_lock_bh(&xprt->sock_lock);
if (!list_empty(&req->rq_list))
list_del(&req->rq_list);
spin_unlock_bh(&xprt->sock_lock);
task->tk_rqstp = NULL; task->tk_rqstp = NULL;
memset(req, 0, sizeof(*req)); /* mark unused */ memset(req, 0, sizeof(*req)); /* mark unused */
...@@ -1280,6 +1261,8 @@ xprt_setup(struct socket *sock, int proto, ...@@ -1280,6 +1261,8 @@ xprt_setup(struct socket *sock, int proto,
spin_lock_init(&xprt->xprt_lock); spin_lock_init(&xprt->xprt_lock);
init_waitqueue_head(&xprt->cong_wait); init_waitqueue_head(&xprt->cong_wait);
INIT_LIST_HEAD(&xprt->recv);
/* Set timeout parameters */ /* Set timeout parameters */
if (to) { if (to) {
xprt->timeout = *to; xprt->timeout = *to;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment