]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: remove locks from netpoll hotpaths
authorDmitriy Vyukov <dvyukov@google.com>
Wed, 22 Jan 2014 07:27:16 +0000 (11:27 +0400)
committerDmitriy Vyukov <dvyukov@google.com>
Wed, 22 Jan 2014 07:27:16 +0000 (11:27 +0400)
Introduces two-phase goroutine parking mechanism -- prepare to park, commit park.
This mechanism does not require backing mutex to protect wait predicate.
Use it in netpoll. See comment in netpoll.goc for details.
This slightly reduces contention between reader, writer and read/write io notifications;
and just eliminates a bunch of mutex operations from hotpaths, thus making then faster.

benchmark                             old ns/op    new ns/op    delta
BenchmarkTCP4ConcurrentReadWrite           2109         1945   -7.78%
BenchmarkTCP4ConcurrentReadWrite-2         1162         1113   -4.22%
BenchmarkTCP4ConcurrentReadWrite-4          798          755   -5.39%
BenchmarkTCP4ConcurrentReadWrite-8          803          748   -6.85%
BenchmarkTCP4Persistent                    9411         9240   -1.82%
BenchmarkTCP4Persistent-2                  5888         5813   -1.27%
BenchmarkTCP4Persistent-4                  4016         3968   -1.20%
BenchmarkTCP4Persistent-8                  3943         3857   -2.18%

R=golang-codereviews, mikioh.mikioh, gobot, iant, rsc
CC=golang-codereviews, khr
https://golang.org/cl/45700043

src/pkg/runtime/asm_386.s
src/pkg/runtime/asm_amd64.s
src/pkg/runtime/atomic_arm.c
src/pkg/runtime/chan.c
src/pkg/runtime/mgc0.c
src/pkg/runtime/netpoll.goc
src/pkg/runtime/proc.c
src/pkg/runtime/runtime.h
src/pkg/runtime/sema.goc
src/pkg/runtime/time.goc

index 5c642c0ed82a3775e862a77959531d94a8161e3a..ccd2567fdcbdd8d7093c59819a48d2b2b8962dec 100644 (file)
@@ -483,6 +483,12 @@ TEXT runtime·xchg(SB), NOSPLIT, $0-8
        XCHGL   AX, 0(BX)
        RET
 
+TEXT runtime·xchgp(SB), NOSPLIT, $0-8
+       MOVL    4(SP), BX
+       MOVL    8(SP), AX
+       XCHGL   AX, 0(BX)
+       RET
+
 TEXT runtime·procyield(SB),NOSPLIT,$0-0
        MOVL    4(SP), AX
 again:
index 980bcd4520cbf1f7477ac03c17485cf2ba4e32bf..17e91c04db82d19fea0feab423412afed0c29c5c 100644 (file)
@@ -549,6 +549,12 @@ TEXT runtime·xchg64(SB), NOSPLIT, $0-16
        XCHGQ   AX, 0(BX)
        RET
 
+TEXT runtime·xchgp(SB), NOSPLIT, $0-16
+       MOVQ    8(SP), BX
+       MOVQ    16(SP), AX
+       XCHGQ   AX, 0(BX)
+       RET
+
 TEXT runtime·procyield(SB),NOSPLIT,$0-0
        MOVL    8(SP), AX
 again:
index b1e97b27dd66774be8b70f1bdb0b0817272f736b..87e88d75630e8587431291bd205b8c44cef639a9 100644 (file)
@@ -41,6 +41,19 @@ runtime·xchg(uint32 volatile* addr, uint32 v)
        }
 }
 
+#pragma textflag NOSPLIT
+void*
+runtime·xchgp(void* volatile* addr, void* v)
+{
+       void *old;
+
+       for(;;) {
+               old = *addr;
+               if(runtime·cas(addr, old, v))
+                       return old;
+       }
+}
+
 #pragma textflag NOSPLIT
 void
 runtime·procyield(uint32 cnt)
index bb3388548ddd52a27ffebdcfef14edfaf6448bec..fd382f80f151eed62641019384e84f0be6ef8655 100644 (file)
@@ -224,7 +224,7 @@ runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc)
        mysg.selgen = NOSELGEN;
        g->param = nil;
        enqueue(&c->sendq, &mysg);
-       runtime·park(runtime·unlock, c, "chan send");
+       runtime·parkunlock(c, "chan send");
 
        if(g->param == nil) {
                runtime·lock(c);
@@ -252,7 +252,7 @@ asynch:
                mysg.elem = nil;
                mysg.selgen = NOSELGEN;
                enqueue(&c->sendq, &mysg);
-               runtime·park(runtime·unlock, c, "chan send");
+               runtime·parkunlock(c, "chan send");
 
                runtime·lock(c);
                goto asynch;
@@ -356,7 +356,7 @@ runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *receive
        mysg.selgen = NOSELGEN;
        g->param = nil;
        enqueue(&c->recvq, &mysg);
-       runtime·park(runtime·unlock, c, "chan receive");
+       runtime·parkunlock(c, "chan receive");
 
        if(g->param == nil) {
                runtime·lock(c);
@@ -387,7 +387,7 @@ asynch:
                mysg.elem = nil;
                mysg.selgen = NOSELGEN;
                enqueue(&c->recvq, &mysg);
-               runtime·park(runtime·unlock, c, "chan receive");
+               runtime·parkunlock(c, "chan receive");
 
                runtime·lock(c);
                goto asynch;
@@ -799,6 +799,14 @@ selunlock(Select *sel)
        }
 }
 
+static bool
+selparkcommit(G *gp, void *sel)
+{
+       USED(gp);
+       selunlock(sel);
+       return true;
+}
+
 void
 runtime·block(void)
 {
@@ -971,7 +979,7 @@ loop:
        }
 
        g->param = nil;
-       runtime·park((void(*)(Lock*))selunlock, (Lock*)sel, "select");
+       runtime·park(selparkcommit, sel, "select");
 
        sellock(sel);
        sg = g->param;
index ebcc364618a0fbb25baa1d7eb0228c0778549a32..2c82fb3ac437e143002bf32e34138ceab77b0ff8 100644 (file)
@@ -2307,7 +2307,7 @@ runfinq(void)
                finq = nil;
                if(fb == nil) {
                        fingwait = 1;
-                       runtime·park(runtime·unlock, &finlock, "finalizer wait");
+                       runtime·parkunlock(&finlock, "finalizer wait");
                        continue;
                }
                runtime·unlock(&finlock);
index 9b5176645a7349ccb1da5927822ee93433375a84..2830f882d806123a431813444ef213bf4ee3b45a 100644 (file)
@@ -19,21 +19,40 @@ package net
 // An implementation must call the following function to denote that the pd is ready.
 // void runtime·netpollready(G **gpp, PollDesc *pd, int32 mode);
 
+// PollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
+// goroutines respectively. The semaphore can be in the following states:
+// READY - io readiness notification is pending;
+//         a goroutine consumes the notification by changing the state to nil.
+// WAIT - a goroutine prepares to park on the semaphore, but not yet parked;
+//        the goroutine commits to park by changing the state to G pointer,
+//        or, alternatively, concurrent io notification changes the state to READY,
+//        or, alternatively, concurrent timeout/close changes the state to nil.
+// G pointer - the goroutine is blocked on the semaphore;
+//             io notification or timeout/close changes the state to READY or nil respectively
+//             and unparks the goroutine.
+// nil - nothing of the above.
 #define READY ((G*)1)
+#define WAIT  ((G*)2)
 
 struct PollDesc
 {
        PollDesc* link; // in pollcache, protected by pollcache.Lock
+
+       // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
+       // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
+       // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO rediness notification)
+       // proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated
+       // in a lock-free way by all operations.
        Lock;           // protectes the following fields
        uintptr fd;
        bool    closing;
        uintptr seq;    // protects from stale timers and ready notifications
-       G*      rg;     // G waiting for read or READY (binary semaphore)
+       G*      rg;     // READY, WAIT, G waiting for read or nil
        Timer   rt;     // read deadline timer (set if rt.fv != nil)
        int64   rd;     // read deadline
-       G*      wg;     // the same for writes
-       Timer   wt;
-       int64   wd;
+       G*      wg;     // READY, WAIT, G waiting for write or nil
+       Timer   wt;     // write deadline timer
+       int64   wd;     // write deadline
 };
 
 static struct
@@ -47,7 +66,7 @@ static struct
        // seq is incremented when deadlines are changed or descriptor is reused.
 } pollcache;
 
-static bool    netpollblock(PollDesc*, int32);
+static bool    netpollblock(PollDesc*, int32, bool);
 static G*      netpollunblock(PollDesc*, int32, bool);
 static void    deadline(int64, Eface);
 static void    readDeadline(int64, Eface);
@@ -97,7 +116,6 @@ func runtime_pollClose(pd *PollDesc) {
 }
 
 func runtime_pollReset(pd *PollDesc, mode int) (err int) {
-       runtime·lock(pd);
        err = checkerr(pd, mode);
        if(err)
                goto ret;
@@ -106,11 +124,9 @@ func runtime_pollReset(pd *PollDesc, mode int) (err int) {
        else if(mode == 'w')
                pd->wg = nil;
 ret:
-       runtime·unlock(pd);
 }
 
 func runtime_pollWait(pd *PollDesc, mode int) (err int) {
-       runtime·lock(pd);
        err = checkerr(pd, mode);
        if(err == 0) {
 #ifdef GOOS_solaris
@@ -119,7 +135,7 @@ func runtime_pollWait(pd *PollDesc, mode int) (err int) {
                else if(mode == 'w')
                        runtime·netpollarmwrite(pd->fd);
 #endif
-               while(!netpollblock(pd, mode)) {
+               while(!netpollblock(pd, mode, false)) {
                        err = checkerr(pd, mode);
                        if(err != 0)
                                break;
@@ -128,11 +144,9 @@ func runtime_pollWait(pd *PollDesc, mode int) (err int) {
                        // Pretend it has not happened and retry.
                }
        }
-       runtime·unlock(pd);
 }
 
 func runtime_pollWaitCanceled(pd *PollDesc, mode int) {
-       runtime·lock(pd);
 #ifdef GOOS_solaris
        if(mode == 'r')
                runtime·netpollarmread(pd->fd);
@@ -140,9 +154,8 @@ func runtime_pollWaitCanceled(pd *PollDesc, mode int) {
                runtime·netpollarmwrite(pd->fd);
 #endif
        // wait for ioready, ignore closing or timeouts.
-       while(!netpollblock(pd, mode))
+       while(!netpollblock(pd, mode, true))
                ;
-       runtime·unlock(pd);
 }
 
 func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) {
@@ -197,7 +210,7 @@ func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) {
        }
        // If we set the new deadline in the past, unblock currently pending IO if any.
        rg = nil;
-       wg = nil;
+       runtime·atomicstorep(&wg, nil);  // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock
        if(pd->rd < 0)
                rg = netpollunblock(pd, 'r', false);
        if(pd->wd < 0)
@@ -217,6 +230,7 @@ func runtime_pollUnblock(pd *PollDesc) {
                runtime·throw("runtime_pollUnblock: already closing");
        pd->closing = true;
        pd->seq++;
+       runtime·atomicstorep(&rg, nil);  // full memory barrier between store to closing and read of rg/wg in netpollunblock
        rg = netpollunblock(pd, 'r', false);
        wg = netpollunblock(pd, 'w', false);
        if(pd->rt.fv) {
@@ -247,12 +261,10 @@ runtime·netpollready(G **gpp, PollDesc *pd, int32 mode)
        G *rg, *wg;
 
        rg = wg = nil;
-       runtime·lock(pd);
        if(mode == 'r' || mode == 'r'+'w')
                rg = netpollunblock(pd, 'r', true);
        if(mode == 'w' || mode == 'r'+'w')
                wg = netpollunblock(pd, 'w', true);
-       runtime·unlock(pd);
        if(rg) {
                rg->schedlink = *gpp;
                *gpp = rg;
@@ -273,51 +285,75 @@ checkerr(PollDesc *pd, int32 mode)
        return 0;
 }
 
+static bool
+blockcommit(G *gp, G **gpp)
+{
+       return runtime·casp(gpp, WAIT, gp);
+}
+
 // returns true if IO is ready, or false if timedout or closed
+// waitio - wait only for completed IO, ignore errors
 static bool
-netpollblock(PollDesc *pd, int32 mode)
+netpollblock(PollDesc *pd, int32 mode, bool waitio)
 {
-       G **gpp;
+       G **gpp, *old;
 
        gpp = &pd->rg;
        if(mode == 'w')
                gpp = &pd->wg;
-       if(*gpp == READY) {
-               *gpp = nil;
-               return true;
+
+       // set the gpp semaphore to WAIT
+       for(;;) {
+               old = *gpp;
+               if(old == READY) {
+                       *gpp = nil;
+                       return true;
+               }
+               if(old != nil)
+                       runtime·throw("netpollblock: double wait");
+               if(runtime·casp(gpp, nil, WAIT))
+                       break;
        }
-       if(*gpp != nil)
-               runtime·throw("netpollblock: double wait");
-       *gpp = g;
-       runtime·park(runtime·unlock, &pd->Lock, "IO wait");
-       runtime·lock(pd);
-       if(g->param)
-               return true;
-       return false;
+
+       // need to recheck error states after setting gpp to WAIT
+       // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
+       // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
+       if(waitio || checkerr(pd, mode) == 0)
+               runtime·park((bool(*)(G*, void*))blockcommit, gpp, "IO wait");
+       // be careful to not lose concurrent READY notification
+       old = runtime·xchgp(gpp, nil);
+       if(old > WAIT)
+               runtime·throw("netpollblock: corrupted state");
+       return old == READY;
 }
 
 static G*
 netpollunblock(PollDesc *pd, int32 mode, bool ioready)
 {
-       G **gpp, *old;
+       G **gpp, *old, *new;
 
        gpp = &pd->rg;
        if(mode == 'w')
                gpp = &pd->wg;
-       if(*gpp == READY)
-               return nil;
-       if(*gpp == nil) {
-               // Only set READY for ioready. runtime_pollWait
-               // will check for timeout/cancel before waiting.
+
+       for(;;) {
+               old = *gpp;
+               if(old == READY)
+                       return nil;
+               if(old == nil && !ioready) {
+                       // Only set READY for ioready. runtime_pollWait
+                       // will check for timeout/cancel before waiting.
+                       return nil;
+               }
+               new = nil;
                if(ioready)
-                       *gpp = READY;
-               return nil;
+                       new = READY;
+               if(runtime·casp(gpp, old, new))
+                       break;
        }
-       old = *gpp;
-       // pass unblock reason onto blocked g
-       old->param = (void*)ioready;
-       *gpp = nil;
-       return old;
+       if(old > WAIT)
+               return old;  // must be G*
+       return nil;
 }
 
 static void
@@ -343,14 +379,14 @@ deadlineimpl(int64 now, Eface arg, bool read, bool write)
                if(pd->rd <= 0 || pd->rt.fv == nil)
                        runtime·throw("deadlineimpl: inconsistent read deadline");
                pd->rd = -1;
-               pd->rt.fv = nil;
+               runtime·atomicstorep(&pd->rt.fv, nil);  // full memory barrier between store to rd and load of rg in netpollunblock
                rg = netpollunblock(pd, 'r', false);
        }
        if(write) {
                if(pd->wd <= 0 || (pd->wt.fv == nil && !read))
                        runtime·throw("deadlineimpl: inconsistent write deadline");
                pd->wd = -1;
-               pd->wt.fv = nil;
+               runtime·atomicstorep(&pd->wt.fv, nil);  // full memory barrier between store to wd and load of wg in netpollunblock
                wg = netpollunblock(pd, 'w', false);
        }
        runtime·unlock(pd);
index 9eb4ad9f9585a015e0ccb4acbd8601c3ecec800e..24feda4183477ae61a98b65b393d13c59ef8ea5f 100644 (file)
@@ -1353,10 +1353,10 @@ top:
        execute(gp);
 }
 
-// Puts the current goroutine into a waiting state and unlocks the lock.
-// The goroutine can be made runnable again by calling runtime·ready(gp).
+// Puts the current goroutine into a waiting state and calls unlockf.
+// If unlockf returns false, the goroutine is resumed.
 void
-runtime·park(void(*unlockf)(Lock*), Lock *lock, int8 *reason)
+runtime·park(bool(*unlockf)(G*, void*), void *lock, int8 *reason)
 {
        m->waitlock = lock;
        m->waitunlockf = unlockf;
@@ -1364,17 +1364,39 @@ runtime·park(void(*unlockf)(Lock*), Lock *lock, int8 *reason)
        runtime·mcall(park0);
 }
 
+static bool
+parkunlock(G *gp, void *lock)
+{
+       USED(gp);
+       runtime·unlock(lock);
+       return true;
+}
+
+// Puts the current goroutine into a waiting state and unlocks the lock.
+// The goroutine can be made runnable again by calling runtime·ready(gp).
+void
+runtime·parkunlock(Lock *lock, int8 *reason)
+{
+       runtime·park(parkunlock, lock, reason);
+}
+
 // runtime·park continuation on g0.
 static void
 park0(G *gp)
 {
+       bool ok;
+
        gp->status = Gwaiting;
        gp->m = nil;
        m->curg = nil;
        if(m->waitunlockf) {
-               m->waitunlockf(m->waitlock);
+               ok = m->waitunlockf(gp, m->waitlock);
                m->waitunlockf = nil;
                m->waitlock = nil;
+               if(!ok) {
+                       gp->status = Grunnable;
+                       execute(gp);  // Schedule it back, never returns.
+               }
        }
        if(m->lockedg) {
                stoplockedm();
index 5e3c0c497f198bca85b2b0540b904a5f5e66c6c2..c4c47964b92e88e671808a1cf7070014c61b4b19 100644 (file)
@@ -339,7 +339,7 @@ struct      M
        GCStats gcstats;
        bool    racecall;
        bool    needextram;
-       void    (*waitunlockf)(Lock*);
+       bool    (*waitunlockf)(G*, void*);
        void*   waitlock;
 
        uintptr settype_buf[1024];
@@ -790,21 +790,6 @@ int32      runtime·read(int32, void*, int32);
 int32  runtime·write(int32, void*, int32);
 int32  runtime·close(int32);
 int32  runtime·mincore(void*, uintptr, byte*);
-bool   runtime·cas(uint32*, uint32, uint32);
-bool   runtime·cas64(uint64*, uint64, uint64);
-bool   runtime·casp(void**, void*, void*);
-// Don't confuse with XADD x86 instruction,
-// this one is actually 'addx', that is, add-and-fetch.
-uint32 runtime·xadd(uint32 volatile*, int32);
-uint64 runtime·xadd64(uint64 volatile*, int64);
-uint32 runtime·xchg(uint32 volatile*, uint32);
-uint64 runtime·xchg64(uint64 volatile*, uint64);
-uint32 runtime·atomicload(uint32 volatile*);
-void   runtime·atomicstore(uint32 volatile*, uint32);
-void   runtime·atomicstore64(uint64 volatile*, uint64);
-uint64 runtime·atomicload64(uint64 volatile*);
-void*  runtime·atomicloadp(void* volatile*);
-void   runtime·atomicstorep(void* volatile*, void*);
 void   runtime·jmpdefer(FuncVal*, void*);
 void   runtime·exit1(int32);
 void   runtime·ready(G*);
@@ -845,14 +830,33 @@ uint32    runtime·fastrand1(void);
 void   runtime·rewindmorestack(Gobuf*);
 int32  runtime·timediv(int64, int32, int32*);
 
-void runtime·setmg(M*, G*);
-void runtime·newextram(void);
+// atomic operations
+bool   runtime·cas(uint32*, uint32, uint32);
+bool   runtime·cas64(uint64*, uint64, uint64);
+bool   runtime·casp(void**, void*, void*);
+// Don't confuse with XADD x86 instruction,
+// this one is actually 'addx', that is, add-and-fetch.
+uint32 runtime·xadd(uint32 volatile*, int32);
+uint64 runtime·xadd64(uint64 volatile*, int64);
+uint32 runtime·xchg(uint32 volatile*, uint32);
+uint64 runtime·xchg64(uint64 volatile*, uint64);
+void*  runtime·xchgp(void* volatile*, void*);
+uint32 runtime·atomicload(uint32 volatile*);
+void   runtime·atomicstore(uint32 volatile*, uint32);
+void   runtime·atomicstore64(uint64 volatile*, uint64);
+uint64 runtime·atomicload64(uint64 volatile*);
+void*  runtime·atomicloadp(void* volatile*);
+void   runtime·atomicstorep(void* volatile*, void*);
+
+void   runtime·setmg(M*, G*);
+void   runtime·newextram(void);
 void   runtime·exit(int32);
 void   runtime·breakpoint(void);
 void   runtime·gosched(void);
 void   runtime·gosched0(G*);
 void   runtime·schedtrace(bool);
-void   runtime·park(void(*)(Lock*), Lock*, int8*);
+void   runtime·park(bool(*)(G*, void*), void*, int8*);
+void   runtime·parkunlock(Lock*, int8*);
 void   runtime·tsleep(int64, int8*);
 M*     runtime·newm(void);
 void   runtime·goexit(void);
index 57f32a0ddbee5d716eeafa101e696a9439993eab..c1e8e4e18b7dbbdaa282a22695f4fa8b527243b0 100644 (file)
@@ -137,7 +137,7 @@ runtime·semacquire(uint32 volatile *addr, bool profile)
                // Any semrelease after the cansemacquire knows we're waiting
                // (we set nwait above), so go to sleep.
                semqueue(root, addr, &s);
-               runtime·park(runtime·unlock, root, "semacquire");
+               runtime·parkunlock(root, "semacquire");
                if(cansemacquire(addr)) {
                        if(t0)
                                runtime·blockevent(s.releasetime - t0, 3);
@@ -254,7 +254,7 @@ func runtime_Syncsemacquire(s *SyncSema) {
                else
                        s->tail->next = &w;
                s->tail = &w;
-               runtime·park(runtime·unlock, s, "semacquire");
+               runtime·parkunlock(s, "semacquire");
                if(t0)
                        runtime·blockevent(w.releasetime - t0, 2);
        }
@@ -288,7 +288,7 @@ func runtime_Syncsemrelease(s *SyncSema, n uint32) {
                else
                        s->tail->next = &w;
                s->tail = &w;
-               runtime·park(runtime·unlock, s, "semarelease");
+               runtime·parkunlock(s, "semarelease");
        } else
                runtime·unlock(s);
 }
index b575696f715c5e7e7e44e2abd417d3b68047e295..d52a3b32174840fae9fbb5d779a8649121e16aa3 100644 (file)
@@ -76,7 +76,7 @@ runtime·tsleep(int64 ns, int8 *reason)
        t.arg.data = g;
        runtime·lock(&timers);
        addtimer(&t);
-       runtime·park(runtime·unlock, &timers, reason);
+       runtime·parkunlock(&timers, reason);
 }
 
 static FuncVal timerprocv = {timerproc};
@@ -222,7 +222,7 @@ timerproc(void)
                if(delta < 0) {
                        // No timers left - put goroutine to sleep.
                        timers.rescheduling = true;
-                       runtime·park(runtime·unlock, &timers, "timer goroutine (idle)");
+                       runtime·parkunlock(&timers, "timer goroutine (idle)");
                        continue;
                }
                // At least one timer pending.  Sleep until then.