Commit 95f7691d authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Convert xprt receive queue to use an rbtree

If the server is slow, we can find ourselves with quite a lot of entries
on the receive queue. Converting the search from an O(n) to O(log(n))
can make a significant difference, particularly since we have to hold
a number of locks while searching.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent bd79bc57
...@@ -85,7 +85,7 @@ struct rpc_rqst { ...@@ -85,7 +85,7 @@ struct rpc_rqst {
union { union {
struct list_head rq_list; /* Slot allocation list */ struct list_head rq_list; /* Slot allocation list */
struct list_head rq_recv; /* Receive queue */ struct rb_node rq_recv; /* Receive queue */
}; };
struct list_head rq_xmit; /* Send queue */ struct list_head rq_xmit; /* Send queue */
...@@ -260,7 +260,7 @@ struct rpc_xprt { ...@@ -260,7 +260,7 @@ struct rpc_xprt {
* backchannel rpc_rqst's */ * backchannel rpc_rqst's */
#endif /* CONFIG_SUNRPC_BACKCHANNEL */ #endif /* CONFIG_SUNRPC_BACKCHANNEL */
struct list_head recv_queue; /* Receive queue */ struct rb_root recv_queue; /* Receive queue */
struct { struct {
unsigned long bind_count, /* total number of binds */ unsigned long bind_count, /* total number of binds */
......
...@@ -753,7 +753,7 @@ static void ...@@ -753,7 +753,7 @@ static void
xprt_schedule_autodisconnect(struct rpc_xprt *xprt) xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
__must_hold(&xprt->transport_lock) __must_hold(&xprt->transport_lock)
{ {
if (list_empty(&xprt->recv_queue) && xprt_has_timer(xprt)) if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt))
mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
} }
...@@ -763,7 +763,7 @@ xprt_init_autodisconnect(struct timer_list *t) ...@@ -763,7 +763,7 @@ xprt_init_autodisconnect(struct timer_list *t)
struct rpc_xprt *xprt = from_timer(xprt, t, timer); struct rpc_xprt *xprt = from_timer(xprt, t, timer);
spin_lock(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
if (!list_empty(&xprt->recv_queue)) if (!RB_EMPTY_ROOT(&xprt->recv_queue))
goto out_abort; goto out_abort;
/* Reset xprt->last_used to avoid connect/autodisconnect cycling */ /* Reset xprt->last_used to avoid connect/autodisconnect cycling */
xprt->last_used = jiffies; xprt->last_used = jiffies;
...@@ -880,6 +880,75 @@ static void xprt_connect_status(struct rpc_task *task) ...@@ -880,6 +880,75 @@ static void xprt_connect_status(struct rpc_task *task)
} }
} }
enum xprt_xid_rb_cmp {
XID_RB_EQUAL,
XID_RB_LEFT,
XID_RB_RIGHT,
};
static enum xprt_xid_rb_cmp
xprt_xid_cmp(__be32 xid1, __be32 xid2)
{
if (xid1 == xid2)
return XID_RB_EQUAL;
if ((__force u32)xid1 < (__force u32)xid2)
return XID_RB_LEFT;
return XID_RB_RIGHT;
}
static struct rpc_rqst *
xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid)
{
struct rb_node *n = xprt->recv_queue.rb_node;
struct rpc_rqst *req;
while (n != NULL) {
req = rb_entry(n, struct rpc_rqst, rq_recv);
switch (xprt_xid_cmp(xid, req->rq_xid)) {
case XID_RB_LEFT:
n = n->rb_left;
break;
case XID_RB_RIGHT:
n = n->rb_right;
break;
case XID_RB_EQUAL:
return req;
}
}
return NULL;
}
static void
xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new)
{
struct rb_node **p = &xprt->recv_queue.rb_node;
struct rb_node *n = NULL;
struct rpc_rqst *req;
while (*p != NULL) {
n = *p;
req = rb_entry(n, struct rpc_rqst, rq_recv);
switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) {
case XID_RB_LEFT:
p = &n->rb_left;
break;
case XID_RB_RIGHT:
p = &n->rb_right;
break;
case XID_RB_EQUAL:
WARN_ON_ONCE(new != req);
return;
}
}
rb_link_node(&new->rq_recv, n, p);
rb_insert_color(&new->rq_recv, &xprt->recv_queue);
}
static void
xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
rb_erase(&req->rq_recv, &xprt->recv_queue);
}
/** /**
* xprt_lookup_rqst - find an RPC request corresponding to an XID * xprt_lookup_rqst - find an RPC request corresponding to an XID
* @xprt: transport on which the original request was transmitted * @xprt: transport on which the original request was transmitted
...@@ -891,12 +960,12 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) ...@@ -891,12 +960,12 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
{ {
struct rpc_rqst *entry; struct rpc_rqst *entry;
list_for_each_entry(entry, &xprt->recv_queue, rq_recv) entry = xprt_request_rb_find(xprt, xid);
if (entry->rq_xid == xid) { if (entry != NULL) {
trace_xprt_lookup_rqst(xprt, xid, 0); trace_xprt_lookup_rqst(xprt, xid, 0);
entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
return entry; return entry;
} }
dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n",
ntohl(xid)); ntohl(xid));
...@@ -981,7 +1050,7 @@ xprt_request_enqueue_receive(struct rpc_task *task) ...@@ -981,7 +1050,7 @@ xprt_request_enqueue_receive(struct rpc_task *task)
sizeof(req->rq_private_buf)); sizeof(req->rq_private_buf));
/* Add request to the receive list */ /* Add request to the receive list */
list_add_tail(&req->rq_recv, &xprt->recv_queue); xprt_request_rb_insert(xprt, req);
set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
spin_unlock(&xprt->queue_lock); spin_unlock(&xprt->queue_lock);
...@@ -999,8 +1068,10 @@ xprt_request_enqueue_receive(struct rpc_task *task) ...@@ -999,8 +1068,10 @@ xprt_request_enqueue_receive(struct rpc_task *task)
static void static void
xprt_request_dequeue_receive_locked(struct rpc_task *task) xprt_request_dequeue_receive_locked(struct rpc_task *task)
{ {
struct rpc_rqst *req = task->tk_rqstp;
if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
list_del(&task->tk_rqstp->rq_recv); xprt_request_rb_remove(req->rq_xprt, req);
} }
/** /**
...@@ -1711,7 +1782,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net) ...@@ -1711,7 +1782,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
spin_lock_init(&xprt->queue_lock); spin_lock_init(&xprt->queue_lock);
INIT_LIST_HEAD(&xprt->free); INIT_LIST_HEAD(&xprt->free);
INIT_LIST_HEAD(&xprt->recv_queue); xprt->recv_queue = RB_ROOT;
INIT_LIST_HEAD(&xprt->xmit_queue); INIT_LIST_HEAD(&xprt->xmit_queue);
#if defined(CONFIG_SUNRPC_BACKCHANNEL) #if defined(CONFIG_SUNRPC_BACKCHANNEL)
spin_lock_init(&xprt->bc_pa_lock); spin_lock_init(&xprt->bc_pa_lock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment