From 9cbd2fb1aa7ac3d4cd33442a93187d8549dbf1c4 Mon Sep 17 00:00:00 2001 From: Dmitriy Vyukov Date: Wed, 22 Jan 2014 11:27:16 +0400 Subject: [PATCH] runtime: remove locks from netpoll hotpaths Introduces two-phase goroutine parking mechanism -- prepare to park, commit park. This mechanism does not require backing mutex to protect wait predicate. Use it in netpoll. See comment in netpoll.goc for details. This slightly reduces contention between reader, writer and read/write io notifications; and just eliminates a bunch of mutex operations from hotpaths, thus making then faster. benchmark old ns/op new ns/op delta BenchmarkTCP4ConcurrentReadWrite 2109 1945 -7.78% BenchmarkTCP4ConcurrentReadWrite-2 1162 1113 -4.22% BenchmarkTCP4ConcurrentReadWrite-4 798 755 -5.39% BenchmarkTCP4ConcurrentReadWrite-8 803 748 -6.85% BenchmarkTCP4Persistent 9411 9240 -1.82% BenchmarkTCP4Persistent-2 5888 5813 -1.27% BenchmarkTCP4Persistent-4 4016 3968 -1.20% BenchmarkTCP4Persistent-8 3943 3857 -2.18% R=golang-codereviews, mikioh.mikioh, gobot, iant, rsc CC=golang-codereviews, khr https://golang.org/cl/45700043 --- src/pkg/runtime/asm_386.s | 6 ++ src/pkg/runtime/asm_amd64.s | 6 ++ src/pkg/runtime/atomic_arm.c | 13 ++++ src/pkg/runtime/chan.c | 18 +++-- src/pkg/runtime/mgc0.c | 2 +- src/pkg/runtime/netpoll.goc | 124 ++++++++++++++++++++++------------- src/pkg/runtime/proc.c | 30 +++++++-- src/pkg/runtime/runtime.h | 42 ++++++------ src/pkg/runtime/sema.goc | 6 +- src/pkg/runtime/time.goc | 4 +- 10 files changed, 173 insertions(+), 78 deletions(-) diff --git a/src/pkg/runtime/asm_386.s b/src/pkg/runtime/asm_386.s index 5c642c0ed8..ccd2567fdc 100644 --- a/src/pkg/runtime/asm_386.s +++ b/src/pkg/runtime/asm_386.s @@ -483,6 +483,12 @@ TEXT runtime·xchg(SB), NOSPLIT, $0-8 XCHGL AX, 0(BX) RET +TEXT runtime·xchgp(SB), NOSPLIT, $0-8 + MOVL 4(SP), BX + MOVL 8(SP), AX + XCHGL AX, 0(BX) + RET + TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL 4(SP), AX again: diff --git a/src/pkg/runtime/asm_amd64.s b/src/pkg/runtime/asm_amd64.s index 980bcd4520..17e91c04db 100644 --- a/src/pkg/runtime/asm_amd64.s +++ b/src/pkg/runtime/asm_amd64.s @@ -549,6 +549,12 @@ TEXT runtime·xchg64(SB), NOSPLIT, $0-16 XCHGQ AX, 0(BX) RET +TEXT runtime·xchgp(SB), NOSPLIT, $0-16 + MOVQ 8(SP), BX + MOVQ 16(SP), AX + XCHGQ AX, 0(BX) + RET + TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL 8(SP), AX again: diff --git a/src/pkg/runtime/atomic_arm.c b/src/pkg/runtime/atomic_arm.c index b1e97b27dd..87e88d7563 100644 --- a/src/pkg/runtime/atomic_arm.c +++ b/src/pkg/runtime/atomic_arm.c @@ -41,6 +41,19 @@ runtime·xchg(uint32 volatile* addr, uint32 v) } } +#pragma textflag NOSPLIT +void* +runtime·xchgp(void* volatile* addr, void* v) +{ + void *old; + + for(;;) { + old = *addr; + if(runtime·cas(addr, old, v)) + return old; + } +} + #pragma textflag NOSPLIT void runtime·procyield(uint32 cnt) diff --git a/src/pkg/runtime/chan.c b/src/pkg/runtime/chan.c index bb3388548d..fd382f80f1 100644 --- a/src/pkg/runtime/chan.c +++ b/src/pkg/runtime/chan.c @@ -224,7 +224,7 @@ runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc) mysg.selgen = NOSELGEN; g->param = nil; enqueue(&c->sendq, &mysg); - runtime·park(runtime·unlock, c, "chan send"); + runtime·parkunlock(c, "chan send"); if(g->param == nil) { runtime·lock(c); @@ -252,7 +252,7 @@ asynch: mysg.elem = nil; mysg.selgen = NOSELGEN; enqueue(&c->sendq, &mysg); - runtime·park(runtime·unlock, c, "chan send"); + runtime·parkunlock(c, "chan send"); runtime·lock(c); goto asynch; @@ -356,7 +356,7 @@ runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *receive mysg.selgen = NOSELGEN; g->param = nil; enqueue(&c->recvq, &mysg); - runtime·park(runtime·unlock, c, "chan receive"); + runtime·parkunlock(c, "chan receive"); if(g->param == nil) { runtime·lock(c); @@ -387,7 +387,7 @@ asynch: mysg.elem = nil; mysg.selgen = NOSELGEN; enqueue(&c->recvq, &mysg); - runtime·park(runtime·unlock, c, "chan receive"); + runtime·parkunlock(c, "chan receive"); runtime·lock(c); goto asynch; @@ -799,6 +799,14 @@ selunlock(Select *sel) } } +static bool +selparkcommit(G *gp, void *sel) +{ + USED(gp); + selunlock(sel); + return true; +} + void runtime·block(void) { @@ -971,7 +979,7 @@ loop: } g->param = nil; - runtime·park((void(*)(Lock*))selunlock, (Lock*)sel, "select"); + runtime·park(selparkcommit, sel, "select"); sellock(sel); sg = g->param; diff --git a/src/pkg/runtime/mgc0.c b/src/pkg/runtime/mgc0.c index ebcc364618..2c82fb3ac4 100644 --- a/src/pkg/runtime/mgc0.c +++ b/src/pkg/runtime/mgc0.c @@ -2307,7 +2307,7 @@ runfinq(void) finq = nil; if(fb == nil) { fingwait = 1; - runtime·park(runtime·unlock, &finlock, "finalizer wait"); + runtime·parkunlock(&finlock, "finalizer wait"); continue; } runtime·unlock(&finlock); diff --git a/src/pkg/runtime/netpoll.goc b/src/pkg/runtime/netpoll.goc index 9b5176645a..2830f882d8 100644 --- a/src/pkg/runtime/netpoll.goc +++ b/src/pkg/runtime/netpoll.goc @@ -19,21 +19,40 @@ package net // An implementation must call the following function to denote that the pd is ready. // void runtime·netpollready(G **gpp, PollDesc *pd, int32 mode); +// PollDesc contains 2 binary semaphores, rg and wg, to park reader and writer +// goroutines respectively. The semaphore can be in the following states: +// READY - io readiness notification is pending; +// a goroutine consumes the notification by changing the state to nil. +// WAIT - a goroutine prepares to park on the semaphore, but not yet parked; +// the goroutine commits to park by changing the state to G pointer, +// or, alternatively, concurrent io notification changes the state to READY, +// or, alternatively, concurrent timeout/close changes the state to nil. +// G pointer - the goroutine is blocked on the semaphore; +// io notification or timeout/close changes the state to READY or nil respectively +// and unparks the goroutine. +// nil - nothing of the above. #define READY ((G*)1) +#define WAIT ((G*)2) struct PollDesc { PollDesc* link; // in pollcache, protected by pollcache.Lock + + // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations. + // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime. + // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO rediness notification) + // proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated + // in a lock-free way by all operations. Lock; // protectes the following fields uintptr fd; bool closing; uintptr seq; // protects from stale timers and ready notifications - G* rg; // G waiting for read or READY (binary semaphore) + G* rg; // READY, WAIT, G waiting for read or nil Timer rt; // read deadline timer (set if rt.fv != nil) int64 rd; // read deadline - G* wg; // the same for writes - Timer wt; - int64 wd; + G* wg; // READY, WAIT, G waiting for write or nil + Timer wt; // write deadline timer + int64 wd; // write deadline }; static struct @@ -47,7 +66,7 @@ static struct // seq is incremented when deadlines are changed or descriptor is reused. } pollcache; -static bool netpollblock(PollDesc*, int32); +static bool netpollblock(PollDesc*, int32, bool); static G* netpollunblock(PollDesc*, int32, bool); static void deadline(int64, Eface); static void readDeadline(int64, Eface); @@ -97,7 +116,6 @@ func runtime_pollClose(pd *PollDesc) { } func runtime_pollReset(pd *PollDesc, mode int) (err int) { - runtime·lock(pd); err = checkerr(pd, mode); if(err) goto ret; @@ -106,11 +124,9 @@ func runtime_pollReset(pd *PollDesc, mode int) (err int) { else if(mode == 'w') pd->wg = nil; ret: - runtime·unlock(pd); } func runtime_pollWait(pd *PollDesc, mode int) (err int) { - runtime·lock(pd); err = checkerr(pd, mode); if(err == 0) { #ifdef GOOS_solaris @@ -119,7 +135,7 @@ func runtime_pollWait(pd *PollDesc, mode int) (err int) { else if(mode == 'w') runtime·netpollarmwrite(pd->fd); #endif - while(!netpollblock(pd, mode)) { + while(!netpollblock(pd, mode, false)) { err = checkerr(pd, mode); if(err != 0) break; @@ -128,11 +144,9 @@ func runtime_pollWait(pd *PollDesc, mode int) (err int) { // Pretend it has not happened and retry. } } - runtime·unlock(pd); } func runtime_pollWaitCanceled(pd *PollDesc, mode int) { - runtime·lock(pd); #ifdef GOOS_solaris if(mode == 'r') runtime·netpollarmread(pd->fd); @@ -140,9 +154,8 @@ func runtime_pollWaitCanceled(pd *PollDesc, mode int) { runtime·netpollarmwrite(pd->fd); #endif // wait for ioready, ignore closing or timeouts. - while(!netpollblock(pd, mode)) + while(!netpollblock(pd, mode, true)) ; - runtime·unlock(pd); } func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) { @@ -197,7 +210,7 @@ func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) { } // If we set the new deadline in the past, unblock currently pending IO if any. rg = nil; - wg = nil; + runtime·atomicstorep(&wg, nil); // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock if(pd->rd < 0) rg = netpollunblock(pd, 'r', false); if(pd->wd < 0) @@ -217,6 +230,7 @@ func runtime_pollUnblock(pd *PollDesc) { runtime·throw("runtime_pollUnblock: already closing"); pd->closing = true; pd->seq++; + runtime·atomicstorep(&rg, nil); // full memory barrier between store to closing and read of rg/wg in netpollunblock rg = netpollunblock(pd, 'r', false); wg = netpollunblock(pd, 'w', false); if(pd->rt.fv) { @@ -247,12 +261,10 @@ runtime·netpollready(G **gpp, PollDesc *pd, int32 mode) G *rg, *wg; rg = wg = nil; - runtime·lock(pd); if(mode == 'r' || mode == 'r'+'w') rg = netpollunblock(pd, 'r', true); if(mode == 'w' || mode == 'r'+'w') wg = netpollunblock(pd, 'w', true); - runtime·unlock(pd); if(rg) { rg->schedlink = *gpp; *gpp = rg; @@ -273,51 +285,75 @@ checkerr(PollDesc *pd, int32 mode) return 0; } +static bool +blockcommit(G *gp, G **gpp) +{ + return runtime·casp(gpp, WAIT, gp); +} + // returns true if IO is ready, or false if timedout or closed +// waitio - wait only for completed IO, ignore errors static bool -netpollblock(PollDesc *pd, int32 mode) +netpollblock(PollDesc *pd, int32 mode, bool waitio) { - G **gpp; + G **gpp, *old; gpp = &pd->rg; if(mode == 'w') gpp = &pd->wg; - if(*gpp == READY) { - *gpp = nil; - return true; + + // set the gpp semaphore to WAIT + for(;;) { + old = *gpp; + if(old == READY) { + *gpp = nil; + return true; + } + if(old != nil) + runtime·throw("netpollblock: double wait"); + if(runtime·casp(gpp, nil, WAIT)) + break; } - if(*gpp != nil) - runtime·throw("netpollblock: double wait"); - *gpp = g; - runtime·park(runtime·unlock, &pd->Lock, "IO wait"); - runtime·lock(pd); - if(g->param) - return true; - return false; + + // need to recheck error states after setting gpp to WAIT + // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl + // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg + if(waitio || checkerr(pd, mode) == 0) + runtime·park((bool(*)(G*, void*))blockcommit, gpp, "IO wait"); + // be careful to not lose concurrent READY notification + old = runtime·xchgp(gpp, nil); + if(old > WAIT) + runtime·throw("netpollblock: corrupted state"); + return old == READY; } static G* netpollunblock(PollDesc *pd, int32 mode, bool ioready) { - G **gpp, *old; + G **gpp, *old, *new; gpp = &pd->rg; if(mode == 'w') gpp = &pd->wg; - if(*gpp == READY) - return nil; - if(*gpp == nil) { - // Only set READY for ioready. runtime_pollWait - // will check for timeout/cancel before waiting. + + for(;;) { + old = *gpp; + if(old == READY) + return nil; + if(old == nil && !ioready) { + // Only set READY for ioready. runtime_pollWait + // will check for timeout/cancel before waiting. + return nil; + } + new = nil; if(ioready) - *gpp = READY; - return nil; + new = READY; + if(runtime·casp(gpp, old, new)) + break; } - old = *gpp; - // pass unblock reason onto blocked g - old->param = (void*)ioready; - *gpp = nil; - return old; + if(old > WAIT) + return old; // must be G* + return nil; } static void @@ -343,14 +379,14 @@ deadlineimpl(int64 now, Eface arg, bool read, bool write) if(pd->rd <= 0 || pd->rt.fv == nil) runtime·throw("deadlineimpl: inconsistent read deadline"); pd->rd = -1; - pd->rt.fv = nil; + runtime·atomicstorep(&pd->rt.fv, nil); // full memory barrier between store to rd and load of rg in netpollunblock rg = netpollunblock(pd, 'r', false); } if(write) { if(pd->wd <= 0 || (pd->wt.fv == nil && !read)) runtime·throw("deadlineimpl: inconsistent write deadline"); pd->wd = -1; - pd->wt.fv = nil; + runtime·atomicstorep(&pd->wt.fv, nil); // full memory barrier between store to wd and load of wg in netpollunblock wg = netpollunblock(pd, 'w', false); } runtime·unlock(pd); diff --git a/src/pkg/runtime/proc.c b/src/pkg/runtime/proc.c index 9eb4ad9f95..24feda4183 100644 --- a/src/pkg/runtime/proc.c +++ b/src/pkg/runtime/proc.c @@ -1353,10 +1353,10 @@ top: execute(gp); } -// Puts the current goroutine into a waiting state and unlocks the lock. -// The goroutine can be made runnable again by calling runtime·ready(gp). +// Puts the current goroutine into a waiting state and calls unlockf. +// If unlockf returns false, the goroutine is resumed. void -runtime·park(void(*unlockf)(Lock*), Lock *lock, int8 *reason) +runtime·park(bool(*unlockf)(G*, void*), void *lock, int8 *reason) { m->waitlock = lock; m->waitunlockf = unlockf; @@ -1364,17 +1364,39 @@ runtime·park(void(*unlockf)(Lock*), Lock *lock, int8 *reason) runtime·mcall(park0); } +static bool +parkunlock(G *gp, void *lock) +{ + USED(gp); + runtime·unlock(lock); + return true; +} + +// Puts the current goroutine into a waiting state and unlocks the lock. +// The goroutine can be made runnable again by calling runtime·ready(gp). +void +runtime·parkunlock(Lock *lock, int8 *reason) +{ + runtime·park(parkunlock, lock, reason); +} + // runtime·park continuation on g0. static void park0(G *gp) { + bool ok; + gp->status = Gwaiting; gp->m = nil; m->curg = nil; if(m->waitunlockf) { - m->waitunlockf(m->waitlock); + ok = m->waitunlockf(gp, m->waitlock); m->waitunlockf = nil; m->waitlock = nil; + if(!ok) { + gp->status = Grunnable; + execute(gp); // Schedule it back, never returns. + } } if(m->lockedg) { stoplockedm(); diff --git a/src/pkg/runtime/runtime.h b/src/pkg/runtime/runtime.h index 5e3c0c497f..c4c47964b9 100644 --- a/src/pkg/runtime/runtime.h +++ b/src/pkg/runtime/runtime.h @@ -339,7 +339,7 @@ struct M GCStats gcstats; bool racecall; bool needextram; - void (*waitunlockf)(Lock*); + bool (*waitunlockf)(G*, void*); void* waitlock; uintptr settype_buf[1024]; @@ -790,21 +790,6 @@ int32 runtime·read(int32, void*, int32); int32 runtime·write(int32, void*, int32); int32 runtime·close(int32); int32 runtime·mincore(void*, uintptr, byte*); -bool runtime·cas(uint32*, uint32, uint32); -bool runtime·cas64(uint64*, uint64, uint64); -bool runtime·casp(void**, void*, void*); -// Don't confuse with XADD x86 instruction, -// this one is actually 'addx', that is, add-and-fetch. -uint32 runtime·xadd(uint32 volatile*, int32); -uint64 runtime·xadd64(uint64 volatile*, int64); -uint32 runtime·xchg(uint32 volatile*, uint32); -uint64 runtime·xchg64(uint64 volatile*, uint64); -uint32 runtime·atomicload(uint32 volatile*); -void runtime·atomicstore(uint32 volatile*, uint32); -void runtime·atomicstore64(uint64 volatile*, uint64); -uint64 runtime·atomicload64(uint64 volatile*); -void* runtime·atomicloadp(void* volatile*); -void runtime·atomicstorep(void* volatile*, void*); void runtime·jmpdefer(FuncVal*, void*); void runtime·exit1(int32); void runtime·ready(G*); @@ -845,14 +830,33 @@ uint32 runtime·fastrand1(void); void runtime·rewindmorestack(Gobuf*); int32 runtime·timediv(int64, int32, int32*); -void runtime·setmg(M*, G*); -void runtime·newextram(void); +// atomic operations +bool runtime·cas(uint32*, uint32, uint32); +bool runtime·cas64(uint64*, uint64, uint64); +bool runtime·casp(void**, void*, void*); +// Don't confuse with XADD x86 instruction, +// this one is actually 'addx', that is, add-and-fetch. +uint32 runtime·xadd(uint32 volatile*, int32); +uint64 runtime·xadd64(uint64 volatile*, int64); +uint32 runtime·xchg(uint32 volatile*, uint32); +uint64 runtime·xchg64(uint64 volatile*, uint64); +void* runtime·xchgp(void* volatile*, void*); +uint32 runtime·atomicload(uint32 volatile*); +void runtime·atomicstore(uint32 volatile*, uint32); +void runtime·atomicstore64(uint64 volatile*, uint64); +uint64 runtime·atomicload64(uint64 volatile*); +void* runtime·atomicloadp(void* volatile*); +void runtime·atomicstorep(void* volatile*, void*); + +void runtime·setmg(M*, G*); +void runtime·newextram(void); void runtime·exit(int32); void runtime·breakpoint(void); void runtime·gosched(void); void runtime·gosched0(G*); void runtime·schedtrace(bool); -void runtime·park(void(*)(Lock*), Lock*, int8*); +void runtime·park(bool(*)(G*, void*), void*, int8*); +void runtime·parkunlock(Lock*, int8*); void runtime·tsleep(int64, int8*); M* runtime·newm(void); void runtime·goexit(void); diff --git a/src/pkg/runtime/sema.goc b/src/pkg/runtime/sema.goc index 57f32a0ddb..c1e8e4e18b 100644 --- a/src/pkg/runtime/sema.goc +++ b/src/pkg/runtime/sema.goc @@ -137,7 +137,7 @@ runtime·semacquire(uint32 volatile *addr, bool profile) // Any semrelease after the cansemacquire knows we're waiting // (we set nwait above), so go to sleep. semqueue(root, addr, &s); - runtime·park(runtime·unlock, root, "semacquire"); + runtime·parkunlock(root, "semacquire"); if(cansemacquire(addr)) { if(t0) runtime·blockevent(s.releasetime - t0, 3); @@ -254,7 +254,7 @@ func runtime_Syncsemacquire(s *SyncSema) { else s->tail->next = &w; s->tail = &w; - runtime·park(runtime·unlock, s, "semacquire"); + runtime·parkunlock(s, "semacquire"); if(t0) runtime·blockevent(w.releasetime - t0, 2); } @@ -288,7 +288,7 @@ func runtime_Syncsemrelease(s *SyncSema, n uint32) { else s->tail->next = &w; s->tail = &w; - runtime·park(runtime·unlock, s, "semarelease"); + runtime·parkunlock(s, "semarelease"); } else runtime·unlock(s); } diff --git a/src/pkg/runtime/time.goc b/src/pkg/runtime/time.goc index b575696f71..d52a3b3217 100644 --- a/src/pkg/runtime/time.goc +++ b/src/pkg/runtime/time.goc @@ -76,7 +76,7 @@ runtime·tsleep(int64 ns, int8 *reason) t.arg.data = g; runtime·lock(&timers); addtimer(&t); - runtime·park(runtime·unlock, &timers, reason); + runtime·parkunlock(&timers, reason); } static FuncVal timerprocv = {timerproc}; @@ -222,7 +222,7 @@ timerproc(void) if(delta < 0) { // No timers left - put goroutine to sleep. timers.rescheduling = true; - runtime·park(runtime·unlock, &timers, "timer goroutine (idle)"); + runtime·parkunlock(&timers, "timer goroutine (idle)"); continue; } // At least one timer pending. Sleep until then. -- 2.48.1