From 4722b1cbd3c734b67c0e3c1cd4458cdbd51e5844 Mon Sep 17 00:00:00 2001 From: Dmitriy Vyukov Date: Thu, 16 Jan 2014 12:17:00 +0400 Subject: [PATCH] runtime: use lock-free ring for work queues Use lock-free fixed-size ring for work queues instead of an unbounded mutex-protected array. The ring has single producer and multiple consumers. If the ring overflows, work is put onto global queue. benchmark old ns/op new ns/op delta BenchmarkMatmult 7 5 -18.12% BenchmarkMatmult-4 2 2 -18.98% BenchmarkMatmult-16 1 0 -12.84% BenchmarkCreateGoroutines 105 88 -16.10% BenchmarkCreateGoroutines-4 376 219 -41.76% BenchmarkCreateGoroutines-16 241 174 -27.80% BenchmarkCreateGoroutinesParallel 103 87 -14.66% BenchmarkCreateGoroutinesParallel-4 169 143 -15.38% BenchmarkCreateGoroutinesParallel-16 158 151 -4.43% R=golang-codereviews, rsc CC=ddetlefs, devon.odell, golang-codereviews https://golang.org/cl/46170044 --- src/pkg/runtime/proc.c | 250 ++++++++++++++++++-------------------- src/pkg/runtime/runtime.h | 7 +- 2 files changed, 121 insertions(+), 136 deletions(-) diff --git a/src/pkg/runtime/proc.c b/src/pkg/runtime/proc.c index b4bc72b5b7..bc371260fc 100644 --- a/src/pkg/runtime/proc.c +++ b/src/pkg/runtime/proc.c @@ -79,7 +79,7 @@ static int32 newprocs; void runtime·mstart(void); static void runqput(P*, G*); static G* runqget(P*); -static void runqgrow(P*); +static bool runqputslow(P*, G*, uint32, uint32); static G* runqsteal(P*, P*); static void mput(M*); static M* mget(void); @@ -106,6 +106,7 @@ static void gfput(P*, G*); static G* gfget(P*); static void gfpurge(P*); static void globrunqput(G*); +static void globrunqputbatch(G*, G*, int32); static G* globrunqget(P*, int32); static P* pidleget(void); static void pidleput(P*); @@ -2215,27 +2216,26 @@ procresize(int32 new) else p->mcache = runtime·allocmcache(); } - if(p->runq == nil) { - p->runqsize = 128; - p->runq = (G**)runtime·mallocgc(p->runqsize*sizeof(G*), 0, FlagNoInvokeGC); - } } // redistribute runnable G's evenly + // collect all runnable goroutines in global queue for(i = 0; i < old; i++) { p = runtime·allp[i]; while(gp = runqget(p)) globrunqput(gp); } + // fill local queues with at most nelem(p->runq)/2 goroutines // start at 1 because current M already executes some G and will acquire allp[0] below, // so if we have a spare G we want to put it into allp[1]. - for(i = 1; runtime·sched.runqhead; i++) { + for(i = 1; i < new * nelem(p->runq)/2 && runtime·sched.runqsize > 0; i++) { gp = runtime·sched.runqhead; runtime·sched.runqhead = gp->schedlink; + if(runtime·sched.runqhead == nil) + runtime·sched.runqtail = nil; + runtime·sched.runqsize--; runqput(runtime·allp[i%new], gp); } - runtime·sched.runqtail = nil; - runtime·sched.runqsize = 0; // free unused P's for(i = new; i < old; i++) { @@ -2524,7 +2524,7 @@ runtime·schedtrace(bool detailed) static int64 starttime; int64 now; int64 id1, id2, id3; - int32 i, q, t, h, s; + int32 i, t, h; int8 *fmt; M *mp, *lockedm; G *gp, *lockedg; @@ -2551,15 +2551,11 @@ runtime·schedtrace(bool detailed) if(p == nil) continue; mp = p->m; - t = p->runqtail; - h = p->runqhead; - s = p->runqsize; - q = t - h; - if(q < 0) - q += s; + h = runtime·atomicload(&p->runqhead); + t = runtime·atomicload(&p->runqtail); if(detailed) - runtime·printf(" P%d: status=%d schedtick=%d syscalltick=%d m=%d runqsize=%d/%d gfreecnt=%d\n", - i, p->status, p->schedtick, p->syscalltick, mp ? mp->id : -1, q, s, p->gfreecnt); + runtime·printf(" P%d: status=%d schedtick=%d syscalltick=%d m=%d runqsize=%d gfreecnt=%d\n", + i, p->status, p->schedtick, p->syscalltick, mp ? mp->id : -1, t-h, p->gfreecnt); else { // In non-detailed mode format lengths of per-P run queues as: // [len1 len2 len3 len4] @@ -2570,7 +2566,7 @@ runtime·schedtrace(bool detailed) fmt = " [%d"; else if(i == runtime·gomaxprocs-1) fmt = " %d]\n"; - runtime·printf(fmt, q); + runtime·printf(fmt, t-h); } } if(!detailed) { @@ -2645,6 +2641,20 @@ globrunqput(G *gp) runtime·sched.runqsize++; } +// Put a batch of runnable goroutines on the global runnable queue. +// Sched must be locked. +static void +globrunqputbatch(G *ghead, G *gtail, int32 n) +{ + gtail->schedlink = nil; + if(runtime·sched.runqtail) + runtime·sched.runqtail->schedlink = ghead; + else + runtime·sched.runqhead = ghead; + runtime·sched.runqtail = gtail; + runtime·sched.runqsize += n; +} + // Try get a batch of G's from the global runnable queue. // Sched must be locked. static G* @@ -2660,6 +2670,8 @@ globrunqget(P *p, int32 max) n = runtime·sched.runqsize; if(max > 0 && n > max) n = max; + if(n > nelem(p->runq)/2) + n = nelem(p->runq)/2; runtime·sched.runqsize -= n; if(runtime·sched.runqsize == 0) runtime·sched.runqtail = nil; @@ -2699,78 +2711,98 @@ pidleget(void) return p; } -// Put g on local runnable queue. -// TODO(dvyukov): consider using lock-free queue. +// Try to put g on local runnable queue. +// If it's full, put onto global queue. +// Executed only by the owner P. static void runqput(P *p, G *gp) { - int32 h, t, s; + uint32 h, t; - runtime·lock(p); retry: - h = p->runqhead; + h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with consumers t = p->runqtail; - s = p->runqsize; - if(t == h-1 || (h == 0 && t == s-1)) { - runqgrow(p); - goto retry; + if(t - h < nelem(p->runq)) { + p->runq[t%nelem(p->runq)] = gp; + runtime·atomicstore(&p->runqtail, t+1); // store-release, makes the item available for consumption + return; } - p->runq[t++] = gp; - if(t == s) - t = 0; - p->runqtail = t; - runtime·unlock(p); + if(runqputslow(p, gp, h, t)) + return; + // the queue is not full, now the put above must suceed + goto retry; +} + +// Put g and a batch of work from local runnable queue on global queue. +// Executed only by the owner P. +static bool +runqputslow(P *p, G *gp, uint32 h, uint32 t) +{ + G *batch[nelem(p->runq)/2+1]; + uint32 n, i; + + // First, grab a batch from local queue. + n = t-h; + n = n/2; + if(n != nelem(p->runq)/2) + runtime·throw("runqputslow: queue is not full"); + for(i=0; irunq[(h+i)%nelem(p->runq)]; + if(!runtime·cas(&p->runqhead, h, h+n)) // cas-release, commits consume + return false; + batch[n] = gp; + // Link the goroutines. + for(i=0; ischedlink = batch[i+1]; + // Now put the batch on global queue. + runtime·lock(&runtime·sched); + globrunqputbatch(batch[0], batch[n], n+1); + runtime·unlock(&runtime·sched); + return true; } // Get g from local runnable queue. +// Executed only by the owner P. static G* runqget(P *p) { G *gp; - int32 t, h, s; + uint32 t, h; - if(p->runqhead == p->runqtail) - return nil; - runtime·lock(p); - h = p->runqhead; - t = p->runqtail; - s = p->runqsize; - if(t == h) { - runtime·unlock(p); - return nil; + for(;;) { + h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with other consumers + t = p->runqtail; + if(t == h) + return nil; + gp = p->runq[h%nelem(p->runq)]; + if(runtime·cas(&p->runqhead, h, h+1)) // cas-release, commits consume + return gp; } - gp = p->runq[h++]; - if(h == s) - h = 0; - p->runqhead = h; - runtime·unlock(p); - return gp; } -// Grow local runnable queue. -// TODO(dvyukov): consider using fixed-size array -// and transfer excess to the global list (local queue can grow way too big). -static void -runqgrow(P *p) +// Grabs a batch of goroutines from local runnable queue. +// batch array must be of size nelem(p->runq)/2. Returns number of grabbed goroutines. +// Can be executed by any P. +static uint32 +runqgrab(P *p, G **batch) { - G **q; - int32 s, t, h, t2; + uint32 t, h, n, i; - h = p->runqhead; - t = p->runqtail; - s = p->runqsize; - t2 = 0; - q = runtime·malloc(2*s*sizeof(*q)); - while(t != h) { - q[t2++] = p->runq[h++]; - if(h == s) - h = 0; + for(;;) { + h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with other consumers + t = runtime·atomicload(&p->runqtail); // load-acquire, synchronize with the producer + n = t-h; + n = n - n/2; + if(n == 0) + break; + if(n > nelem(p->runq)/2) // read inconsistent h and t + continue; + for(i=0; irunq[(h+i)%nelem(p->runq)]; + if(runtime·cas(&p->runqhead, h, h+n)) // cas-release, commits consume + break; } - runtime·free(p->runq); - p->runq = q; - p->runqhead = 0; - p->runqtail = t2; - p->runqsize = 2*s; + return n; } // Steal half of elements from local runnable queue of p2 @@ -2779,57 +2811,24 @@ runqgrow(P *p) static G* runqsteal(P *p, P *p2) { - G *gp, *gp1; - int32 t, h, s, t2, h2, s2, c, i; + G *gp; + G *batch[nelem(p->runq)/2]; + uint32 t, h, n, i; - if(p2->runqhead == p2->runqtail) + n = runqgrab(p2, batch); + if(n == 0) return nil; - // sort locks to prevent deadlocks - if(p < p2) - runtime·lock(p); - runtime·lock(p2); - if(p2->runqhead == p2->runqtail) { - runtime·unlock(p2); - if(p < p2) - runtime·unlock(p); - return nil; - } - if(p >= p2) - runtime·lock(p); - // now we've locked both queues and know the victim is not empty - h = p->runqhead; + n--; + gp = batch[n]; + if(n == 0) + return gp; + h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with consumers t = p->runqtail; - s = p->runqsize; - h2 = p2->runqhead; - t2 = p2->runqtail; - s2 = p2->runqsize; - gp = p2->runq[h2++]; // return value - if(h2 == s2) - h2 = 0; - // steal roughly half - if(t2 > h2) - c = (t2 - h2) / 2; - else - c = (s2 - h2 + t2) / 2; - // copy - for(i = 0; i != c; i++) { - // the target queue is full? - if(t == h-1 || (h == 0 && t == s-1)) - break; - // the victim queue is empty? - if(t2 == h2) - break; - gp1 = p2->runq[h2++]; - if(h2 == s2) - h2 = 0; - p->runq[t++] = gp1; - if(t == s) - t = 0; - } - p->runqtail = t; - p2->runqhead = h2; - runtime·unlock(p2); - runtime·unlock(p); + if(t - h + n >= nelem(p->runq)) + runtime·throw("runqsteal: runq overflow"); + for(i=0; irunq[t%nelem(p->runq)] = batch[i]; + runtime·atomicstore(&p->runqtail, t); // store-release, makes the item available for consumption return gp; } @@ -2837,14 +2836,10 @@ void runtime·testSchedLocalQueue(void) { P p; - G gs[1000]; + G gs[nelem(p.runq)]; int32 i, j; runtime·memclr((byte*)&p, sizeof(p)); - p.runqsize = 1; - p.runqhead = 0; - p.runqtail = 0; - p.runq = runtime·malloc(p.runqsize*sizeof(*p.runq)); for(i = 0; i < nelem(gs); i++) { if(runqget(&p) != nil) @@ -2866,20 +2861,11 @@ void runtime·testSchedLocalQueueSteal(void) { P p1, p2; - G gs[1000], *gp; + G gs[nelem(p1.runq)], *gp; int32 i, j, s; runtime·memclr((byte*)&p1, sizeof(p1)); - p1.runqsize = 1; - p1.runqhead = 0; - p1.runqtail = 0; - p1.runq = runtime·malloc(p1.runqsize*sizeof(*p1.runq)); - runtime·memclr((byte*)&p2, sizeof(p2)); - p2.runqsize = nelem(gs); - p2.runqhead = 0; - p2.runqtail = 0; - p2.runq = runtime·malloc(p2.runqsize*sizeof(*p2.runq)); for(i = 0; i < nelem(gs); i++) { for(j = 0; j < i; j++) { diff --git a/src/pkg/runtime/runtime.h b/src/pkg/runtime/runtime.h index ef783efae0..236284b93e 100644 --- a/src/pkg/runtime/runtime.h +++ b/src/pkg/runtime/runtime.h @@ -373,10 +373,9 @@ struct P MCache* mcache; // Queue of runnable goroutines. - G** runq; - int32 runqhead; - int32 runqtail; - int32 runqsize; + uint32 runqhead; + uint32 runqtail; + G* runq[256]; // Available G's (status == Gdead) G* gfree; -- 2.48.1