]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: add network polling support into scheduler
authorDmitriy Vyukov <dvyukov@google.com>
Tue, 12 Mar 2013 17:14:26 +0000 (21:14 +0400)
committerDmitriy Vyukov <dvyukov@google.com>
Tue, 12 Mar 2013 17:14:26 +0000 (21:14 +0400)
This is a part of the bigger change that moves network poller into runtime:
https://golang.org/cl/7326051/

R=golang-dev, bradfitz, mikioh.mikioh, rsc
CC=golang-dev
https://golang.org/cl/7448048

src/pkg/runtime/netpoll_stub.c [new file with mode: 0644]
src/pkg/runtime/proc.c
src/pkg/runtime/runtime.h

diff --git a/src/pkg/runtime/netpoll_stub.c b/src/pkg/runtime/netpoll_stub.c
new file mode 100644 (file)
index 0000000..90da7a8
--- /dev/null
@@ -0,0 +1,18 @@
+// Copyright 2013 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build windows
+
+#include "runtime.h"
+
+// Polls for ready network connections.
+// Returns list of goroutines that become runnable.
+G*
+runtime·netpoll(bool block)
+{
+       // Implementation for platforms that do not support
+       // integrated network poller.
+       USED(block);
+       return nil;
+}
index fff270c4fb49735db19b0a77c0d2e04201e35604..313ac653b485e71e40c1aca24f6fd42d2f5911f6 100644 (file)
@@ -49,6 +49,7 @@ struct Sched {
        Note    stopnote;
        uint32  sysmonwait;
        Note    sysmonnote;
+       uint64  lastpoll;
 
        int32   profilehz;      // cpu profiling rate
 };
@@ -107,6 +108,7 @@ static void globrunqput(G*);
 static G* globrunqget(P*);
 static P* pidleget(void);
 static void pidleput(P*);
+static void injectglist(G*);
 
 // The bootstrap sequence is:
 //
@@ -135,6 +137,7 @@ runtime·schedinit(void)
        // so that we don't need to call malloc when we crash.
        // runtime·findfunc(0);
 
+       runtime·sched.lastpoll = runtime·nanotime();
        procs = 1;
        p = runtime·getenv("GOMAXPROCS");
        if(p != nil && (n = runtime·atoi(p)) > 0) {
@@ -391,8 +394,11 @@ runtime·starttheworld(void)
 {
        P *p, *p1;
        M *mp;
+       G *gp;
        bool add;
 
+       gp = runtime·netpoll(false);  // non-blocking
+       injectglist(gp);
        add = needaddgcproc();
        runtime·lock(&runtime·sched);
        if(newprocs) {
@@ -976,7 +982,7 @@ execute(G *gp)
 }
 
 // Finds a runnable goroutine to execute.
-// Tries to steal from other P's and get g from global queue.
+// Tries to steal from other P's, get g from global queue, poll network.
 static G*
 findrunnable(void)
 {
@@ -1001,6 +1007,13 @@ top:
                if(gp)
                        return gp;
        }
+       // poll network
+       gp = runtime·netpoll(false);  // non-blocking
+       if(gp) {
+               injectglist(gp->schedlink);
+               gp->status = Grunnable;
+               return gp;
+       }
        // If number of spinning M's >= number of busy P's, block.
        // This is necessary to prevent excessive CPU consumption
        // when GOMAXPROCS>>1 but the program parallelism is low.
@@ -1055,10 +1068,54 @@ stop:
                        break;
                }
        }
+       // poll network
+       if(runtime·xchg64(&runtime·sched.lastpoll, 0) != 0) {
+               if(m->p)
+                       runtime·throw("findrunnable: netpoll with p");
+               if(m->spinning)
+                       runtime·throw("findrunnable: netpoll with spinning");
+               gp = runtime·netpoll(true);  // block until new work is available
+               runtime·atomicstore64(&runtime·sched.lastpoll, runtime·nanotime());
+               if(gp) {
+                       runtime·lock(&runtime·sched);
+                       p = pidleget();
+                       runtime·unlock(&runtime·sched);
+                       if(p) {
+                               acquirep(p);
+                               injectglist(gp->schedlink);
+                               gp->status = Grunnable;
+                               return gp;
+                       }
+                       injectglist(gp);
+               }
+       }
        stopm();
        goto top;
 }
 
+// Injects the list of runnable G's into the scheduler.
+// Can run concurrently with GC.
+static void
+injectglist(G *glist)
+{
+       int32 n;
+       G *gp;
+
+       if(glist == nil)
+               return;
+       runtime·lock(&runtime·sched);
+       for(n = 0; glist; n++) {
+               gp = glist;
+               glist = gp->schedlink;
+               gp->status = Grunnable;
+               globrunqput(gp);
+       }
+       runtime·unlock(&runtime·sched);
+
+       for(; n && runtime·sched.npidle; n--)
+               startm(nil, false);
+}
+
 // One round of scheduler: find a runnable goroutine and execute it.
 // Never returns.
 static void
@@ -1916,6 +1973,8 @@ static void
 sysmon(void)
 {
        uint32 idle, delay;
+       int64 now, lastpoll;
+       G *gp;
        uint32 ticks[MaxGomaxprocs];
 
        idle = 0;  // how many cycles in succession we had not wokeup somebody
@@ -1940,6 +1999,14 @@ sysmon(void)
                        } else
                                runtime·unlock(&runtime·sched);
                }
+               // poll network if not polled for more than 10ms
+               lastpoll = runtime·atomicload64(&runtime·sched.lastpoll);
+               now = runtime·nanotime();
+               if(lastpoll != 0 && lastpoll + 10*1000*1000 > now) {
+                       gp = runtime·netpoll(false);  // non-blocking
+                       injectglist(gp);
+               }
+               // retake P's blocked in syscalls
                if(retake(ticks))
                        idle = 0;
                else
index b0276072fdb48be237838f7733d836f6c4ead0f5..ffbd5c219d5f829c237d56446f35e3142453d6fe 100644 (file)
@@ -767,6 +767,7 @@ void        runtime·blockevent(int64, int32);
 extern int64 runtime·blockprofilerate;
 void   runtime·addtimer(Timer*);
 bool   runtime·deltimer(Timer*);
+G*     runtime·netpoll(bool);
 
 #pragma        varargck        argpos  runtime·printf 1
 #pragma        varargck        type    "d"     int32
@@ -968,5 +969,5 @@ extern uint64 ·neginf;
 
 enum
 {
-       UseSpanType = 1,
+       UseSpanType = 0,
 };