]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: implement local work queues (in preparation for new scheduler)
authorDmitriy Vyukov <dvyukov@google.com>
Sat, 23 Feb 2013 04:48:02 +0000 (08:48 +0400)
committerDmitriy Vyukov <dvyukov@google.com>
Sat, 23 Feb 2013 04:48:02 +0000 (08:48 +0400)
R=golang-dev, rsc
CC=golang-dev
https://golang.org/cl/7402047

src/pkg/runtime/export_test.go
src/pkg/runtime/proc.c
src/pkg/runtime/proc_test.go
src/pkg/runtime/runtime.h

index c1971cd2d18af077d5a542ffac3bb71b0951b77e..062aea2487ba914713fc81fc114b065ab0a369e7 100644 (file)
@@ -61,3 +61,9 @@ func ParForIters(desc *ParFor, tid uint32) (uint32, uint32) {
        begin, end := parforiters(desc, uintptr(tid))
        return uint32(begin), uint32(end)
 }
+
+func testSchedLocalQueue()
+func testSchedLocalQueueSteal()
+
+var TestSchedLocalQueue1 = testSchedLocalQueue
+var TestSchedLocalQueueSteal1 = testSchedLocalQueueSteal
index f1e3ad59d77728f496006d2be02ae73371a533ac..4f02d00faaf37d0b3cee205d8f9f959002860cd8 100644 (file)
@@ -154,6 +154,10 @@ static void matchmg(void); // match m's to g's
 static void readylocked(G*);   // ready, but sched is locked
 static void mnextg(M*, G*);
 static void mcommoninit(M*);
+static void runqput(P*, G*);
+static G* runqget(P*);
+static void runqgrow(P*);
+static G* runqsteal(P*, P*);
 
 void
 setmcpumax(uint32 n)
@@ -1755,3 +1759,216 @@ runtime·setcpuprofilerate(void (*fn)(uintptr*, int32), int32 hz)
        if(hz != 0)
                runtime·resetcpuprofiler(hz);
 }
+
+// Put g on local runnable queue.
+// TODO(dvyukov): consider using lock-free queue.
+static void
+runqput(P *p, G *gp)
+{
+       int32 h, t, s;
+
+       runtime·lock(p);
+retry:
+       h = p->runqhead;
+       t = p->runqtail;
+       s = p->runqsize;
+       if(t == h-1 || (h == 0 && t == s-1)) {
+               runqgrow(p);
+               goto retry;
+       }
+       p->runq[t++] = gp;
+       if(t == s)
+               t = 0;
+       p->runqtail = t;
+       runtime·unlock(p);
+}
+
+// Get g from local runnable queue.
+static G*
+runqget(P *p)
+{
+       G *gp;
+       int32 t, h, s;
+
+       if(p->runqhead == p->runqtail)
+               return nil;
+       runtime·lock(p);
+       h = p->runqhead;
+       t = p->runqtail;
+       s = p->runqsize;
+       if(t == h) {
+               runtime·unlock(p);
+               return nil;
+       }
+       gp = p->runq[h++];
+       if(h == s)
+               h = 0;
+       p->runqhead = h;
+       runtime·unlock(p);
+       return gp;
+}
+
+// Grow local runnable queue.
+// TODO(dvyukov): consider using fixed-size array
+// and transfer excess to the global list (local queue can grow way too big).
+static void
+runqgrow(P *p)
+{
+       G **q;
+       int32 s, t, h, t2;
+
+       h = p->runqhead;
+       t = p->runqtail;
+       s = p->runqsize;
+       t2 = 0;
+       q = runtime·malloc(2*s*sizeof(*q));
+       while(t != h) {
+               q[t2++] = p->runq[h++];
+               if(h == s)
+                       h = 0;
+       }
+       runtime·free(p->runq);
+       p->runq = q;
+       p->runqhead = 0;
+       p->runqtail = t2;
+       p->runqsize = 2*s;
+}
+
+// Steal half of elements from local runnable queue of p2
+// and put onto local runnable queue of p.
+// Returns one of the stolen elements (or nil if failed).
+static G*
+runqsteal(P *p, P *p2)
+{
+       G *gp, *gp1;
+       int32 t, h, s, t2, h2, s2, c, i;
+
+       if(p2->runqhead == p2->runqtail)
+               return nil;
+       // sort locks to prevent deadlocks
+       if(p < p2)
+               runtime·lock(p);
+       runtime·lock(p2);
+       if(p2->runqhead == p2->runqtail) {
+               runtime·unlock(p2);
+               if(p < p2)
+                       runtime·unlock(p);
+               return nil;
+       }
+       if(p >= p2)
+               runtime·lock(p);
+       // now we've locked both queues and know the victim is not empty
+       h = p->runqhead;
+       t = p->runqtail;
+       s = p->runqsize;
+       h2 = p2->runqhead;
+       t2 = p2->runqtail;
+       s2 = p2->runqsize;
+       gp = p2->runq[h2++];  // return value
+       if(h2 == s2)
+               h2 = 0;
+       // steal roughly half
+       if(t2 > h2)
+               c = (t2 - h2) / 2;
+       else
+               c = (s2 - h2 + t2) / 2;
+       // copy
+       for(i = 0; i != c; i++) {
+               // the target queue is full?
+               if(t == h-1 || (h == 0 && t == s-1))
+                       break;
+               // the victim queue is empty?
+               if(t2 == h2)
+                       break;
+               gp1 = p2->runq[h2++];
+               if(h2 == s2)
+                       h2 = 0;
+               p->runq[t++] = gp1;
+               if(t == s)
+                       t = 0;
+       }
+       p->runqtail = t;
+       p2->runqhead = h2;
+       runtime·unlock(p2);
+       runtime·unlock(p);
+       return gp;
+}
+
+void
+runtime·testSchedLocalQueue(void)
+{
+       P p;
+       G gs[1000];
+       int32 i, j;
+
+       runtime·memclr((byte*)&p, sizeof(p));
+       p.runqsize = 1;
+       p.runqhead = 0;
+       p.runqtail = 0;
+       p.runq = runtime·malloc(p.runqsize*sizeof(*p.runq));
+
+       for(i = 0; i < nelem(gs); i++) {
+               if(runqget(&p) != nil)
+                       runtime·throw("runq is not empty initially");
+               for(j = 0; j < i; j++)
+                       runqput(&p, &gs[i]);
+               for(j = 0; j < i; j++) {
+                       if(runqget(&p) != &gs[i]) {
+                               runtime·printf("bad element at iter %d/%d\n", i, j);
+                               runtime·throw("bad element");
+                       }
+               }
+               if(runqget(&p) != nil)
+                       runtime·throw("runq is not empty afterwards");
+       }
+}
+
+void
+runtime·testSchedLocalQueueSteal(void)
+{
+       P p1, p2;
+       G gs[1000], *gp;
+       int32 i, j, s;
+
+       runtime·memclr((byte*)&p1, sizeof(p1));
+       p1.runqsize = 1;
+       p1.runqhead = 0;
+       p1.runqtail = 0;
+       p1.runq = runtime·malloc(p1.runqsize*sizeof(*p1.runq));
+
+       runtime·memclr((byte*)&p2, sizeof(p2));
+       p2.runqsize = nelem(gs);
+       p2.runqhead = 0;
+       p2.runqtail = 0;
+       p2.runq = runtime·malloc(p2.runqsize*sizeof(*p2.runq));
+
+       for(i = 0; i < nelem(gs); i++) {
+               for(j = 0; j < i; j++) {
+                       gs[j].sig = 0;
+                       runqput(&p1, &gs[j]);
+               }
+               gp = runqsteal(&p2, &p1);
+               s = 0;
+               if(gp) {
+                       s++;
+                       gp->sig++;
+               }
+               while(gp = runqget(&p2)) {
+                       s++;
+                       gp->sig++;
+               }
+               while(gp = runqget(&p1))
+                       gp->sig++;
+               for(j = 0; j < i; j++) {
+                       if(gs[j].sig != 1) {
+                               runtime·printf("bad element %d(%d) at iter %d\n", j, gs[j].sig, i);
+                               runtime·throw("bad element");
+                       }
+               }
+               if(s != i/2 && s != i/2+1) {
+                       runtime·printf("bad steal %d, want %d or %d, iter %d\n",
+                               s, i/2, i/2+1, i);
+                       runtime·throw("bad steal");
+               }
+       }
+}
\ No newline at end of file
index 1f727da073c343aa3bb0f794405bf75d742fb6d2..b9d57a6da1ab19813a7f9c35969407e0a4c035f4 100644 (file)
@@ -113,6 +113,14 @@ func stackGrowthRecursive(i int) {
        }
 }
 
+func TestSchedLocalQueue(t *testing.T) {
+       runtime.TestSchedLocalQueue1()
+}
+
+func TestSchedLocalQueueSteal(t *testing.T) {
+       runtime.TestSchedLocalQueueSteal1()
+}
+
 func benchmarkStackGrowth(b *testing.B, rec int) {
        const CallsPerSched = 1000
        procs := runtime.GOMAXPROCS(-1)
index 24591995c83091a5359651020f405b6b2f5d70e3..61e33eb95e074989c868da27b514b589a062b183 100644 (file)
@@ -52,6 +52,7 @@ typedef       struct  G               G;
 typedef        struct  Gobuf           Gobuf;
 typedef        union   Lock            Lock;
 typedef        struct  M               M;
+typedef struct P               P;
 typedef        struct  Mem             Mem;
 typedef        union   Note            Note;
 typedef        struct  Slice           Slice;
@@ -312,6 +313,17 @@ struct     M
        uintptr end[];
 };
 
+struct P
+{
+       Lock;
+
+       // Queue of runnable goroutines.
+       G**     runq;
+       int32   runqhead;
+       int32   runqtail;
+       int32   runqsize;
+};
+
 // The m->locked word holds a single bit saying whether
 // external calls to LockOSThread are in effect, and then a counter
 // of the internal nesting depth of lockOSThread / unlockOSThread.