From c21188473103a14beb3b0950fb2331fea7e16d80 Mon Sep 17 00:00:00 2001 From: Dmitriy Vyukov Date: Tue, 12 Mar 2013 21:14:26 +0400 Subject: [PATCH] runtime: add network polling support into scheduler This is a part of the bigger change that moves network poller into runtime: https://golang.org/cl/7326051/ R=golang-dev, bradfitz, mikioh.mikioh, rsc CC=golang-dev https://golang.org/cl/7448048 --- src/pkg/runtime/netpoll_stub.c | 18 +++++++++ src/pkg/runtime/proc.c | 69 +++++++++++++++++++++++++++++++++- src/pkg/runtime/runtime.h | 3 +- 3 files changed, 88 insertions(+), 2 deletions(-) create mode 100644 src/pkg/runtime/netpoll_stub.c diff --git a/src/pkg/runtime/netpoll_stub.c b/src/pkg/runtime/netpoll_stub.c new file mode 100644 index 0000000000..90da7a85de --- /dev/null +++ b/src/pkg/runtime/netpoll_stub.c @@ -0,0 +1,18 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build windows + +#include "runtime.h" + +// Polls for ready network connections. +// Returns list of goroutines that become runnable. +G* +runtime·netpoll(bool block) +{ + // Implementation for platforms that do not support + // integrated network poller. + USED(block); + return nil; +} diff --git a/src/pkg/runtime/proc.c b/src/pkg/runtime/proc.c index fff270c4fb..313ac653b4 100644 --- a/src/pkg/runtime/proc.c +++ b/src/pkg/runtime/proc.c @@ -49,6 +49,7 @@ struct Sched { Note stopnote; uint32 sysmonwait; Note sysmonnote; + uint64 lastpoll; int32 profilehz; // cpu profiling rate }; @@ -107,6 +108,7 @@ static void globrunqput(G*); static G* globrunqget(P*); static P* pidleget(void); static void pidleput(P*); +static void injectglist(G*); // The bootstrap sequence is: // @@ -135,6 +137,7 @@ runtime·schedinit(void) // so that we don't need to call malloc when we crash. // runtime·findfunc(0); + runtime·sched.lastpoll = runtime·nanotime(); procs = 1; p = runtime·getenv("GOMAXPROCS"); if(p != nil && (n = runtime·atoi(p)) > 0) { @@ -391,8 +394,11 @@ runtime·starttheworld(void) { P *p, *p1; M *mp; + G *gp; bool add; + gp = runtime·netpoll(false); // non-blocking + injectglist(gp); add = needaddgcproc(); runtime·lock(&runtime·sched); if(newprocs) { @@ -976,7 +982,7 @@ execute(G *gp) } // Finds a runnable goroutine to execute. -// Tries to steal from other P's and get g from global queue. +// Tries to steal from other P's, get g from global queue, poll network. static G* findrunnable(void) { @@ -1001,6 +1007,13 @@ top: if(gp) return gp; } + // poll network + gp = runtime·netpoll(false); // non-blocking + if(gp) { + injectglist(gp->schedlink); + gp->status = Grunnable; + return gp; + } // If number of spinning M's >= number of busy P's, block. // This is necessary to prevent excessive CPU consumption // when GOMAXPROCS>>1 but the program parallelism is low. @@ -1055,10 +1068,54 @@ stop: break; } } + // poll network + if(runtime·xchg64(&runtime·sched.lastpoll, 0) != 0) { + if(m->p) + runtime·throw("findrunnable: netpoll with p"); + if(m->spinning) + runtime·throw("findrunnable: netpoll with spinning"); + gp = runtime·netpoll(true); // block until new work is available + runtime·atomicstore64(&runtime·sched.lastpoll, runtime·nanotime()); + if(gp) { + runtime·lock(&runtime·sched); + p = pidleget(); + runtime·unlock(&runtime·sched); + if(p) { + acquirep(p); + injectglist(gp->schedlink); + gp->status = Grunnable; + return gp; + } + injectglist(gp); + } + } stopm(); goto top; } +// Injects the list of runnable G's into the scheduler. +// Can run concurrently with GC. +static void +injectglist(G *glist) +{ + int32 n; + G *gp; + + if(glist == nil) + return; + runtime·lock(&runtime·sched); + for(n = 0; glist; n++) { + gp = glist; + glist = gp->schedlink; + gp->status = Grunnable; + globrunqput(gp); + } + runtime·unlock(&runtime·sched); + + for(; n && runtime·sched.npidle; n--) + startm(nil, false); +} + // One round of scheduler: find a runnable goroutine and execute it. // Never returns. static void @@ -1916,6 +1973,8 @@ static void sysmon(void) { uint32 idle, delay; + int64 now, lastpoll; + G *gp; uint32 ticks[MaxGomaxprocs]; idle = 0; // how many cycles in succession we had not wokeup somebody @@ -1940,6 +1999,14 @@ sysmon(void) } else runtime·unlock(&runtime·sched); } + // poll network if not polled for more than 10ms + lastpoll = runtime·atomicload64(&runtime·sched.lastpoll); + now = runtime·nanotime(); + if(lastpoll != 0 && lastpoll + 10*1000*1000 > now) { + gp = runtime·netpoll(false); // non-blocking + injectglist(gp); + } + // retake P's blocked in syscalls if(retake(ticks)) idle = 0; else diff --git a/src/pkg/runtime/runtime.h b/src/pkg/runtime/runtime.h index b0276072fd..ffbd5c219d 100644 --- a/src/pkg/runtime/runtime.h +++ b/src/pkg/runtime/runtime.h @@ -767,6 +767,7 @@ void runtime·blockevent(int64, int32); extern int64 runtime·blockprofilerate; void runtime·addtimer(Timer*); bool runtime·deltimer(Timer*); +G* runtime·netpoll(bool); #pragma varargck argpos runtime·printf 1 #pragma varargck type "d" int32 @@ -968,5 +969,5 @@ extern uint64 ·neginf; enum { - UseSpanType = 1, + UseSpanType = 0, }; -- 2.48.1