]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: use lock-free ring for work queues
authorDmitriy Vyukov <dvyukov@google.com>
Thu, 16 Jan 2014 08:17:00 +0000 (12:17 +0400)
committerDmitriy Vyukov <dvyukov@google.com>
Thu, 16 Jan 2014 08:17:00 +0000 (12:17 +0400)
Use lock-free fixed-size ring for work queues
instead of an unbounded mutex-protected array.
The ring has single producer and multiple consumers.
If the ring overflows, work is put onto global queue.

benchmark              old ns/op    new ns/op    delta
BenchmarkMatmult               7            5  -18.12%
BenchmarkMatmult-4             2            2  -18.98%
BenchmarkMatmult-16            1            0  -12.84%

BenchmarkCreateGoroutines                     105           88  -16.10%
BenchmarkCreateGoroutines-4                   376          219  -41.76%
BenchmarkCreateGoroutines-16                  241          174  -27.80%
BenchmarkCreateGoroutinesParallel             103           87  -14.66%
BenchmarkCreateGoroutinesParallel-4           169          143  -15.38%
BenchmarkCreateGoroutinesParallel-16          158          151   -4.43%

R=golang-codereviews, rsc
CC=ddetlefs, devon.odell, golang-codereviews
https://golang.org/cl/46170044

src/pkg/runtime/proc.c
src/pkg/runtime/runtime.h

index b4bc72b5b777bdb378d59564e2ecaef3cbc49425..bc371260fc53325ebbc4f72f7e71e9b344de6832 100644 (file)
@@ -79,7 +79,7 @@ static int32  newprocs;
 void runtime·mstart(void);
 static void runqput(P*, G*);
 static G* runqget(P*);
-static void runqgrow(P*);
+static bool runqputslow(P*, G*, uint32, uint32);
 static G* runqsteal(P*, P*);
 static void mput(M*);
 static M* mget(void);
@@ -106,6 +106,7 @@ static void gfput(P*, G*);
 static G* gfget(P*);
 static void gfpurge(P*);
 static void globrunqput(G*);
+static void globrunqputbatch(G*, G*, int32);
 static G* globrunqget(P*, int32);
 static P* pidleget(void);
 static void pidleput(P*);
@@ -2215,27 +2216,26 @@ procresize(int32 new)
                        else
                                p->mcache = runtime·allocmcache();
                }
-               if(p->runq == nil) {
-                       p->runqsize = 128;
-                       p->runq = (G**)runtime·mallocgc(p->runqsize*sizeof(G*), 0, FlagNoInvokeGC);
-               }
        }
 
        // redistribute runnable G's evenly
+       // collect all runnable goroutines in global queue
        for(i = 0; i < old; i++) {
                p = runtime·allp[i];
                while(gp = runqget(p))
                        globrunqput(gp);
        }
+       // fill local queues with at most nelem(p->runq)/2 goroutines
        // start at 1 because current M already executes some G and will acquire allp[0] below,
        // so if we have a spare G we want to put it into allp[1].
-       for(i = 1; runtime·sched.runqhead; i++) {
+       for(i = 1; i < new * nelem(p->runq)/2 && runtime·sched.runqsize > 0; i++) {
                gp = runtime·sched.runqhead;
                runtime·sched.runqhead = gp->schedlink;
+               if(runtime·sched.runqhead == nil)
+                       runtime·sched.runqtail = nil;
+               runtime·sched.runqsize--;
                runqput(runtime·allp[i%new], gp);
        }
-       runtime·sched.runqtail = nil;
-       runtime·sched.runqsize = 0;
 
        // free unused P's
        for(i = new; i < old; i++) {
@@ -2524,7 +2524,7 @@ runtime·schedtrace(bool detailed)
        static int64 starttime;
        int64 now;
        int64 id1, id2, id3;
-       int32 i, q, t, h, s;
+       int32 i, t, h;
        int8 *fmt;
        M *mp, *lockedm;
        G *gp, *lockedg;
@@ -2551,15 +2551,11 @@ runtime·schedtrace(bool detailed)
                if(p == nil)
                        continue;
                mp = p->m;
-               t = p->runqtail;
-               h = p->runqhead;
-               s = p->runqsize;
-               q = t - h;
-               if(q < 0)
-                       q += s;
+               h = runtime·atomicload(&p->runqhead);
+               t = runtime·atomicload(&p->runqtail);
                if(detailed)
-                       runtime·printf("  P%d: status=%d schedtick=%d syscalltick=%d m=%d runqsize=%d/%d gfreecnt=%d\n",
-                               i, p->status, p->schedtick, p->syscalltick, mp ? mp->id : -1, q, s, p->gfreecnt);
+                       runtime·printf("  P%d: status=%d schedtick=%d syscalltick=%d m=%d runqsize=%d gfreecnt=%d\n",
+                               i, p->status, p->schedtick, p->syscalltick, mp ? mp->id : -1, t-h, p->gfreecnt);
                else {
                        // In non-detailed mode format lengths of per-P run queues as:
                        // [len1 len2 len3 len4]
@@ -2570,7 +2566,7 @@ runtime·schedtrace(bool detailed)
                                fmt = " [%d";
                        else if(i == runtime·gomaxprocs-1)
                                fmt = " %d]\n";
-                       runtime·printf(fmt, q);
+                       runtime·printf(fmt, t-h);
                }
        }
        if(!detailed) {
@@ -2645,6 +2641,20 @@ globrunqput(G *gp)
        runtime·sched.runqsize++;
 }
 
+// Put a batch of runnable goroutines on the global runnable queue.
+// Sched must be locked.
+static void
+globrunqputbatch(G *ghead, G *gtail, int32 n)
+{
+       gtail->schedlink = nil;
+       if(runtime·sched.runqtail)
+               runtime·sched.runqtail->schedlink = ghead;
+       else
+               runtime·sched.runqhead = ghead;
+       runtime·sched.runqtail = gtail;
+       runtime·sched.runqsize += n;
+}
+
 // Try get a batch of G's from the global runnable queue.
 // Sched must be locked.
 static G*
@@ -2660,6 +2670,8 @@ globrunqget(P *p, int32 max)
                n = runtime·sched.runqsize;
        if(max > 0 && n > max)
                n = max;
+       if(n > nelem(p->runq)/2)
+               n = nelem(p->runq)/2;
        runtime·sched.runqsize -= n;
        if(runtime·sched.runqsize == 0)
                runtime·sched.runqtail = nil;
@@ -2699,78 +2711,98 @@ pidleget(void)
        return p;
 }
 
-// Put g on local runnable queue.
-// TODO(dvyukov): consider using lock-free queue.
+// Try to put g on local runnable queue.
+// If it's full, put onto global queue.
+// Executed only by the owner P.
 static void
 runqput(P *p, G *gp)
 {
-       int32 h, t, s;
+       uint32 h, t;
 
-       runtime·lock(p);
 retry:
-       h = p->runqhead;
+       h = runtime·atomicload(&p->runqhead);  // load-acquire, synchronize with consumers
        t = p->runqtail;
-       s = p->runqsize;
-       if(t == h-1 || (h == 0 && t == s-1)) {
-               runqgrow(p);
-               goto retry;
+       if(t - h < nelem(p->runq)) {
+               p->runq[t%nelem(p->runq)] = gp;
+               runtime·atomicstore(&p->runqtail, t+1);  // store-release, makes the item available for consumption
+               return;
        }
-       p->runq[t++] = gp;
-       if(t == s)
-               t = 0;
-       p->runqtail = t;
-       runtime·unlock(p);
+       if(runqputslow(p, gp, h, t))
+               return;
+       // the queue is not full, now the put above must suceed
+       goto retry;
+}
+
+// Put g and a batch of work from local runnable queue on global queue.
+// Executed only by the owner P.
+static bool
+runqputslow(P *p, G *gp, uint32 h, uint32 t)
+{
+       G *batch[nelem(p->runq)/2+1];
+       uint32 n, i;
+
+       // First, grab a batch from local queue.
+       n = t-h;
+       n = n/2;
+       if(n != nelem(p->runq)/2)
+               runtime·throw("runqputslow: queue is not full");
+       for(i=0; i<n; i++)
+               batch[i] = p->runq[(h+i)%nelem(p->runq)];
+       if(!runtime·cas(&p->runqhead, h, h+n))  // cas-release, commits consume
+               return false;
+       batch[n] = gp;
+       // Link the goroutines.
+       for(i=0; i<n; i++)
+               batch[i]->schedlink = batch[i+1];
+       // Now put the batch on global queue.
+       runtime·lock(&runtime·sched);
+       globrunqputbatch(batch[0], batch[n], n+1);
+       runtime·unlock(&runtime·sched);
+       return true;
 }
 
 // Get g from local runnable queue.
+// Executed only by the owner P.
 static G*
 runqget(P *p)
 {
        G *gp;
-       int32 t, h, s;
+       uint32 t, h;
 
-       if(p->runqhead == p->runqtail)
-               return nil;
-       runtime·lock(p);
-       h = p->runqhead;
-       t = p->runqtail;
-       s = p->runqsize;
-       if(t == h) {
-               runtime·unlock(p);
-               return nil;
+       for(;;) {
+               h = runtime·atomicload(&p->runqhead);  // load-acquire, synchronize with other consumers
+               t = p->runqtail;
+               if(t == h)
+                       return nil;
+               gp = p->runq[h%nelem(p->runq)];
+               if(runtime·cas(&p->runqhead, h, h+1))  // cas-release, commits consume
+                       return gp;
        }
-       gp = p->runq[h++];
-       if(h == s)
-               h = 0;
-       p->runqhead = h;
-       runtime·unlock(p);
-       return gp;
 }
 
-// Grow local runnable queue.
-// TODO(dvyukov): consider using fixed-size array
-// and transfer excess to the global list (local queue can grow way too big).
-static void
-runqgrow(P *p)
+// Grabs a batch of goroutines from local runnable queue.
+// batch array must be of size nelem(p->runq)/2. Returns number of grabbed goroutines.
+// Can be executed by any P.
+static uint32
+runqgrab(P *p, G **batch)
 {
-       G **q;
-       int32 s, t, h, t2;
+       uint32 t, h, n, i;
 
-       h = p->runqhead;
-       t = p->runqtail;
-       s = p->runqsize;
-       t2 = 0;
-       q = runtime·malloc(2*s*sizeof(*q));
-       while(t != h) {
-               q[t2++] = p->runq[h++];
-               if(h == s)
-                       h = 0;
+       for(;;) {
+               h = runtime·atomicload(&p->runqhead);  // load-acquire, synchronize with other consumers
+               t = runtime·atomicload(&p->runqtail);  // load-acquire, synchronize with the producer
+               n = t-h;
+               n = n - n/2;
+               if(n == 0)
+                       break;
+               if(n > nelem(p->runq)/2)  // read inconsistent h and t
+                       continue;
+               for(i=0; i<n; i++)
+                       batch[i] = p->runq[(h+i)%nelem(p->runq)];
+               if(runtime·cas(&p->runqhead, h, h+n))  // cas-release, commits consume
+                       break;
        }
-       runtime·free(p->runq);
-       p->runq = q;
-       p->runqhead = 0;
-       p->runqtail = t2;
-       p->runqsize = 2*s;
+       return n;
 }
 
 // Steal half of elements from local runnable queue of p2
@@ -2779,57 +2811,24 @@ runqgrow(P *p)
 static G*
 runqsteal(P *p, P *p2)
 {
-       G *gp, *gp1;
-       int32 t, h, s, t2, h2, s2, c, i;
+       G *gp;
+       G *batch[nelem(p->runq)/2];
+       uint32 t, h, n, i;
 
-       if(p2->runqhead == p2->runqtail)
+       n = runqgrab(p2, batch);
+       if(n == 0)
                return nil;
-       // sort locks to prevent deadlocks
-       if(p < p2)
-               runtime·lock(p);
-       runtime·lock(p2);
-       if(p2->runqhead == p2->runqtail) {
-               runtime·unlock(p2);
-               if(p < p2)
-                       runtime·unlock(p);
-               return nil;
-       }
-       if(p >= p2)
-               runtime·lock(p);
-       // now we've locked both queues and know the victim is not empty
-       h = p->runqhead;
+       n--;
+       gp = batch[n];
+       if(n == 0)
+               return gp;
+       h = runtime·atomicload(&p->runqhead);  // load-acquire, synchronize with consumers
        t = p->runqtail;
-       s = p->runqsize;
-       h2 = p2->runqhead;
-       t2 = p2->runqtail;
-       s2 = p2->runqsize;
-       gp = p2->runq[h2++];  // return value
-       if(h2 == s2)
-               h2 = 0;
-       // steal roughly half
-       if(t2 > h2)
-               c = (t2 - h2) / 2;
-       else
-               c = (s2 - h2 + t2) / 2;
-       // copy
-       for(i = 0; i != c; i++) {
-               // the target queue is full?
-               if(t == h-1 || (h == 0 && t == s-1))
-                       break;
-               // the victim queue is empty?
-               if(t2 == h2)
-                       break;
-               gp1 = p2->runq[h2++];
-               if(h2 == s2)
-                       h2 = 0;
-               p->runq[t++] = gp1;
-               if(t == s)
-                       t = 0;
-       }
-       p->runqtail = t;
-       p2->runqhead = h2;
-       runtime·unlock(p2);
-       runtime·unlock(p);
+       if(t - h + n >= nelem(p->runq))
+               runtime·throw("runqsteal: runq overflow");
+       for(i=0; i<n; i++, t++)
+               p->runq[t%nelem(p->runq)] = batch[i];
+       runtime·atomicstore(&p->runqtail, t);  // store-release, makes the item available for consumption
        return gp;
 }
 
@@ -2837,14 +2836,10 @@ void
 runtime·testSchedLocalQueue(void)
 {
        P p;
-       G gs[1000];
+       G gs[nelem(p.runq)];
        int32 i, j;
 
        runtime·memclr((byte*)&p, sizeof(p));
-       p.runqsize = 1;
-       p.runqhead = 0;
-       p.runqtail = 0;
-       p.runq = runtime·malloc(p.runqsize*sizeof(*p.runq));
 
        for(i = 0; i < nelem(gs); i++) {
                if(runqget(&p) != nil)
@@ -2866,20 +2861,11 @@ void
 runtime·testSchedLocalQueueSteal(void)
 {
        P p1, p2;
-       G gs[1000], *gp;
+       G gs[nelem(p1.runq)], *gp;
        int32 i, j, s;
 
        runtime·memclr((byte*)&p1, sizeof(p1));
-       p1.runqsize = 1;
-       p1.runqhead = 0;
-       p1.runqtail = 0;
-       p1.runq = runtime·malloc(p1.runqsize*sizeof(*p1.runq));
-
        runtime·memclr((byte*)&p2, sizeof(p2));
-       p2.runqsize = nelem(gs);
-       p2.runqhead = 0;
-       p2.runqtail = 0;
-       p2.runq = runtime·malloc(p2.runqsize*sizeof(*p2.runq));
 
        for(i = 0; i < nelem(gs); i++) {
                for(j = 0; j < i; j++) {
index ef783efae03a2677fd082563b17068eda0b8ec48..236284b93e0d4d4ecf2b6b0af6af430677b51ae6 100644 (file)
@@ -373,10 +373,9 @@ struct P
        MCache* mcache;
 
        // Queue of runnable goroutines.
-       G**     runq;
-       int32   runqhead;
-       int32   runqtail;
-       int32   runqsize;
+       uint32  runqhead;
+       uint32  runqtail;
+       G*      runq[256];
 
        // Available G's (status == Gdead)
        G*      gfree;