Commit 4722b1cb authored by Dmitriy Vyukov's avatar Dmitriy Vyukov

runtime: use lock-free ring for work queues

Use lock-free fixed-size ring for work queues
instead of an unbounded mutex-protected array.
The ring has single producer and multiple consumers.
If the ring overflows, work is put onto global queue.

benchmark              old ns/op    new ns/op    delta
BenchmarkMatmult               7            5  -18.12%
BenchmarkMatmult-4             2            2  -18.98%
BenchmarkMatmult-16            1            0  -12.84%

BenchmarkCreateGoroutines                     105           88  -16.10%
BenchmarkCreateGoroutines-4                   376          219  -41.76%
BenchmarkCreateGoroutines-16                  241          174  -27.80%
BenchmarkCreateGoroutinesParallel             103           87  -14.66%
BenchmarkCreateGoroutinesParallel-4           169          143  -15.38%
BenchmarkCreateGoroutinesParallel-16          158          151   -4.43%

R=golang-codereviews, rsc
CC=ddetlefs, devon.odell, golang-codereviews
https://golang.org/cl/46170044
parent 2af7a26f
...@@ -79,7 +79,7 @@ static int32 newprocs; ...@@ -79,7 +79,7 @@ static int32 newprocs;
void runtime·mstart(void); void runtime·mstart(void);
static void runqput(P*, G*); static void runqput(P*, G*);
static G* runqget(P*); static G* runqget(P*);
static void runqgrow(P*); static bool runqputslow(P*, G*, uint32, uint32);
static G* runqsteal(P*, P*); static G* runqsteal(P*, P*);
static void mput(M*); static void mput(M*);
static M* mget(void); static M* mget(void);
...@@ -106,6 +106,7 @@ static void gfput(P*, G*); ...@@ -106,6 +106,7 @@ static void gfput(P*, G*);
static G* gfget(P*); static G* gfget(P*);
static void gfpurge(P*); static void gfpurge(P*);
static void globrunqput(G*); static void globrunqput(G*);
static void globrunqputbatch(G*, G*, int32);
static G* globrunqget(P*, int32); static G* globrunqget(P*, int32);
static P* pidleget(void); static P* pidleget(void);
static void pidleput(P*); static void pidleput(P*);
...@@ -2215,27 +2216,26 @@ procresize(int32 new) ...@@ -2215,27 +2216,26 @@ procresize(int32 new)
else else
p->mcache = runtime·allocmcache(); p->mcache = runtime·allocmcache();
} }
if(p->runq == nil) {
p->runqsize = 128;
p->runq = (G**)runtime·mallocgc(p->runqsize*sizeof(G*), 0, FlagNoInvokeGC);
}
} }
// redistribute runnable G's evenly // redistribute runnable G's evenly
// collect all runnable goroutines in global queue
for(i = 0; i < old; i++) { for(i = 0; i < old; i++) {
p = runtime·allp[i]; p = runtime·allp[i];
while(gp = runqget(p)) while(gp = runqget(p))
globrunqput(gp); globrunqput(gp);
} }
// fill local queues with at most nelem(p->runq)/2 goroutines
// start at 1 because current M already executes some G and will acquire allp[0] below, // start at 1 because current M already executes some G and will acquire allp[0] below,
// so if we have a spare G we want to put it into allp[1]. // so if we have a spare G we want to put it into allp[1].
for(i = 1; runtime·sched.runqhead; i++) { for(i = 1; i < new * nelem(p->runq)/2 && runtime·sched.runqsize > 0; i++) {
gp = runtime·sched.runqhead; gp = runtime·sched.runqhead;
runtime·sched.runqhead = gp->schedlink; runtime·sched.runqhead = gp->schedlink;
if(runtime·sched.runqhead == nil)
runtime·sched.runqtail = nil;
runtime·sched.runqsize--;
runqput(runtime·allp[i%new], gp); runqput(runtime·allp[i%new], gp);
} }
runtime·sched.runqtail = nil;
runtime·sched.runqsize = 0;
// free unused P's // free unused P's
for(i = new; i < old; i++) { for(i = new; i < old; i++) {
...@@ -2524,7 +2524,7 @@ runtime·schedtrace(bool detailed) ...@@ -2524,7 +2524,7 @@ runtime·schedtrace(bool detailed)
static int64 starttime; static int64 starttime;
int64 now; int64 now;
int64 id1, id2, id3; int64 id1, id2, id3;
int32 i, q, t, h, s; int32 i, t, h;
int8 *fmt; int8 *fmt;
M *mp, *lockedm; M *mp, *lockedm;
G *gp, *lockedg; G *gp, *lockedg;
...@@ -2551,15 +2551,11 @@ runtime·schedtrace(bool detailed) ...@@ -2551,15 +2551,11 @@ runtime·schedtrace(bool detailed)
if(p == nil) if(p == nil)
continue; continue;
mp = p->m; mp = p->m;
t = p->runqtail; h = runtime·atomicload(&p->runqhead);
h = p->runqhead; t = runtime·atomicload(&p->runqtail);
s = p->runqsize;
q = t - h;
if(q < 0)
q += s;
if(detailed) if(detailed)
runtime·printf(" P%d: status=%d schedtick=%d syscalltick=%d m=%d runqsize=%d/%d gfreecnt=%d\n", runtime·printf(" P%d: status=%d schedtick=%d syscalltick=%d m=%d runqsize=%d gfreecnt=%d\n",
i, p->status, p->schedtick, p->syscalltick, mp ? mp->id : -1, q, s, p->gfreecnt); i, p->status, p->schedtick, p->syscalltick, mp ? mp->id : -1, t-h, p->gfreecnt);
else { else {
// In non-detailed mode format lengths of per-P run queues as: // In non-detailed mode format lengths of per-P run queues as:
// [len1 len2 len3 len4] // [len1 len2 len3 len4]
...@@ -2570,7 +2566,7 @@ runtime·schedtrace(bool detailed) ...@@ -2570,7 +2566,7 @@ runtime·schedtrace(bool detailed)
fmt = " [%d"; fmt = " [%d";
else if(i == runtime·gomaxprocs-1) else if(i == runtime·gomaxprocs-1)
fmt = " %d]\n"; fmt = " %d]\n";
runtime·printf(fmt, q); runtime·printf(fmt, t-h);
} }
} }
if(!detailed) { if(!detailed) {
...@@ -2645,6 +2641,20 @@ globrunqput(G *gp) ...@@ -2645,6 +2641,20 @@ globrunqput(G *gp)
runtime·sched.runqsize++; runtime·sched.runqsize++;
} }
// Put a batch of runnable goroutines on the global runnable queue.
// Sched must be locked.
static void
globrunqputbatch(G *ghead, G *gtail, int32 n)
{
gtail->schedlink = nil;
if(runtime·sched.runqtail)
runtime·sched.runqtail->schedlink = ghead;
else
runtime·sched.runqhead = ghead;
runtime·sched.runqtail = gtail;
runtime·sched.runqsize += n;
}
// Try get a batch of G's from the global runnable queue. // Try get a batch of G's from the global runnable queue.
// Sched must be locked. // Sched must be locked.
static G* static G*
...@@ -2660,6 +2670,8 @@ globrunqget(P *p, int32 max) ...@@ -2660,6 +2670,8 @@ globrunqget(P *p, int32 max)
n = runtime·sched.runqsize; n = runtime·sched.runqsize;
if(max > 0 && n > max) if(max > 0 && n > max)
n = max; n = max;
if(n > nelem(p->runq)/2)
n = nelem(p->runq)/2;
runtime·sched.runqsize -= n; runtime·sched.runqsize -= n;
if(runtime·sched.runqsize == 0) if(runtime·sched.runqsize == 0)
runtime·sched.runqtail = nil; runtime·sched.runqtail = nil;
...@@ -2699,78 +2711,98 @@ pidleget(void) ...@@ -2699,78 +2711,98 @@ pidleget(void)
return p; return p;
} }
// Put g on local runnable queue. // Try to put g on local runnable queue.
// TODO(dvyukov): consider using lock-free queue. // If it's full, put onto global queue.
// Executed only by the owner P.
static void static void
runqput(P *p, G *gp) runqput(P *p, G *gp)
{ {
int32 h, t, s; uint32 h, t;
runtime·lock(p);
retry: retry:
h = p->runqhead; h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with consumers
t = p->runqtail; t = p->runqtail;
s = p->runqsize; if(t - h < nelem(p->runq)) {
if(t == h-1 || (h == 0 && t == s-1)) { p->runq[t%nelem(p->runq)] = gp;
runqgrow(p); runtime·atomicstore(&p->runqtail, t+1); // store-release, makes the item available for consumption
goto retry; return;
} }
p->runq[t++] = gp; if(runqputslow(p, gp, h, t))
if(t == s) return;
t = 0; // the queue is not full, now the put above must suceed
p->runqtail = t; goto retry;
runtime·unlock(p); }
// Put g and a batch of work from local runnable queue on global queue.
// Executed only by the owner P.
static bool
runqputslow(P *p, G *gp, uint32 h, uint32 t)
{
G *batch[nelem(p->runq)/2+1];
uint32 n, i;
// First, grab a batch from local queue.
n = t-h;
n = n/2;
if(n != nelem(p->runq)/2)
runtime·throw("runqputslow: queue is not full");
for(i=0; i<n; i++)
batch[i] = p->runq[(h+i)%nelem(p->runq)];
if(!runtime·cas(&p->runqhead, h, h+n)) // cas-release, commits consume
return false;
batch[n] = gp;
// Link the goroutines.
for(i=0; i<n; i++)
batch[i]->schedlink = batch[i+1];
// Now put the batch on global queue.
runtime·lock(&runtime·sched);
globrunqputbatch(batch[0], batch[n], n+1);
runtime·unlock(&runtime·sched);
return true;
} }
// Get g from local runnable queue. // Get g from local runnable queue.
// Executed only by the owner P.
static G* static G*
runqget(P *p) runqget(P *p)
{ {
G *gp; G *gp;
int32 t, h, s; uint32 t, h;
if(p->runqhead == p->runqtail) for(;;) {
return nil; h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with other consumers
runtime·lock(p);
h = p->runqhead;
t = p->runqtail; t = p->runqtail;
s = p->runqsize; if(t == h)
if(t == h) {
runtime·unlock(p);
return nil; return nil;
} gp = p->runq[h%nelem(p->runq)];
gp = p->runq[h++]; if(runtime·cas(&p->runqhead, h, h+1)) // cas-release, commits consume
if(h == s)
h = 0;
p->runqhead = h;
runtime·unlock(p);
return gp; return gp;
}
} }
// Grow local runnable queue. // Grabs a batch of goroutines from local runnable queue.
// TODO(dvyukov): consider using fixed-size array // batch array must be of size nelem(p->runq)/2. Returns number of grabbed goroutines.
// and transfer excess to the global list (local queue can grow way too big). // Can be executed by any P.
static void static uint32
runqgrow(P *p) runqgrab(P *p, G **batch)
{ {
G **q; uint32 t, h, n, i;
int32 s, t, h, t2;
h = p->runqhead; for(;;) {
t = p->runqtail; h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with other consumers
s = p->runqsize; t = runtime·atomicload(&p->runqtail); // load-acquire, synchronize with the producer
t2 = 0; n = t-h;
q = runtime·malloc(2*s*sizeof(*q)); n = n - n/2;
while(t != h) { if(n == 0)
q[t2++] = p->runq[h++]; break;
if(h == s) if(n > nelem(p->runq)/2) // read inconsistent h and t
h = 0; continue;
for(i=0; i<n; i++)
batch[i] = p->runq[(h+i)%nelem(p->runq)];
if(runtime·cas(&p->runqhead, h, h+n)) // cas-release, commits consume
break;
} }
runtime·free(p->runq); return n;
p->runq = q;
p->runqhead = 0;
p->runqtail = t2;
p->runqsize = 2*s;
} }
// Steal half of elements from local runnable queue of p2 // Steal half of elements from local runnable queue of p2
...@@ -2779,57 +2811,24 @@ runqgrow(P *p) ...@@ -2779,57 +2811,24 @@ runqgrow(P *p)
static G* static G*
runqsteal(P *p, P *p2) runqsteal(P *p, P *p2)
{ {
G *gp, *gp1; G *gp;
int32 t, h, s, t2, h2, s2, c, i; G *batch[nelem(p->runq)/2];
uint32 t, h, n, i;
if(p2->runqhead == p2->runqtail) n = runqgrab(p2, batch);
if(n == 0)
return nil; return nil;
// sort locks to prevent deadlocks n--;
if(p < p2) gp = batch[n];
runtime·lock(p); if(n == 0)
runtime·lock(p2); return gp;
if(p2->runqhead == p2->runqtail) { h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with consumers
runtime·unlock(p2);
if(p < p2)
runtime·unlock(p);
return nil;
}
if(p >= p2)
runtime·lock(p);
// now we've locked both queues and know the victim is not empty
h = p->runqhead;
t = p->runqtail; t = p->runqtail;
s = p->runqsize; if(t - h + n >= nelem(p->runq))
h2 = p2->runqhead; runtime·throw("runqsteal: runq overflow");
t2 = p2->runqtail; for(i=0; i<n; i++, t++)
s2 = p2->runqsize; p->runq[t%nelem(p->runq)] = batch[i];
gp = p2->runq[h2++]; // return value runtime·atomicstore(&p->runqtail, t); // store-release, makes the item available for consumption
if(h2 == s2)
h2 = 0;
// steal roughly half
if(t2 > h2)
c = (t2 - h2) / 2;
else
c = (s2 - h2 + t2) / 2;
// copy
for(i = 0; i != c; i++) {
// the target queue is full?
if(t == h-1 || (h == 0 && t == s-1))
break;
// the victim queue is empty?
if(t2 == h2)
break;
gp1 = p2->runq[h2++];
if(h2 == s2)
h2 = 0;
p->runq[t++] = gp1;
if(t == s)
t = 0;
}
p->runqtail = t;
p2->runqhead = h2;
runtime·unlock(p2);
runtime·unlock(p);
return gp; return gp;
} }
...@@ -2837,14 +2836,10 @@ void ...@@ -2837,14 +2836,10 @@ void
runtime·testSchedLocalQueue(void) runtime·testSchedLocalQueue(void)
{ {
P p; P p;
G gs[1000]; G gs[nelem(p.runq)];
int32 i, j; int32 i, j;
runtime·memclr((byte*)&p, sizeof(p)); runtime·memclr((byte*)&p, sizeof(p));
p.runqsize = 1;
p.runqhead = 0;
p.runqtail = 0;
p.runq = runtime·malloc(p.runqsize*sizeof(*p.runq));
for(i = 0; i < nelem(gs); i++) { for(i = 0; i < nelem(gs); i++) {
if(runqget(&p) != nil) if(runqget(&p) != nil)
...@@ -2866,20 +2861,11 @@ void ...@@ -2866,20 +2861,11 @@ void
runtime·testSchedLocalQueueSteal(void) runtime·testSchedLocalQueueSteal(void)
{ {
P p1, p2; P p1, p2;
G gs[1000], *gp; G gs[nelem(p1.runq)], *gp;
int32 i, j, s; int32 i, j, s;
runtime·memclr((byte*)&p1, sizeof(p1)); runtime·memclr((byte*)&p1, sizeof(p1));
p1.runqsize = 1;
p1.runqhead = 0;
p1.runqtail = 0;
p1.runq = runtime·malloc(p1.runqsize*sizeof(*p1.runq));
runtime·memclr((byte*)&p2, sizeof(p2)); runtime·memclr((byte*)&p2, sizeof(p2));
p2.runqsize = nelem(gs);
p2.runqhead = 0;
p2.runqtail = 0;
p2.runq = runtime·malloc(p2.runqsize*sizeof(*p2.runq));
for(i = 0; i < nelem(gs); i++) { for(i = 0; i < nelem(gs); i++) {
for(j = 0; j < i; j++) { for(j = 0; j < i; j++) {
......
...@@ -373,10 +373,9 @@ struct P ...@@ -373,10 +373,9 @@ struct P
MCache* mcache; MCache* mcache;
// Queue of runnable goroutines. // Queue of runnable goroutines.
G** runq; uint32 runqhead;
int32 runqhead; uint32 runqtail;
int32 runqtail; G* runq[256];
int32 runqsize;
// Available G's (status == Gdead) // Available G's (status == Gdead)
G* gfree; G* gfree;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment