XCHGL AX, 0(BX)
RET
+TEXT runtime·xchgp(SB), NOSPLIT, $0-8
+ MOVL 4(SP), BX
+ MOVL 8(SP), AX
+ XCHGL AX, 0(BX)
+ RET
+
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL 4(SP), AX
again:
XCHGQ AX, 0(BX)
RET
+TEXT runtime·xchgp(SB), NOSPLIT, $0-16
+ MOVQ 8(SP), BX
+ MOVQ 16(SP), AX
+ XCHGQ AX, 0(BX)
+ RET
+
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL 8(SP), AX
again:
}
}
+#pragma textflag NOSPLIT
+void*
+runtime·xchgp(void* volatile* addr, void* v)
+{
+ void *old;
+
+ for(;;) {
+ old = *addr;
+ if(runtime·cas(addr, old, v))
+ return old;
+ }
+}
+
#pragma textflag NOSPLIT
void
runtime·procyield(uint32 cnt)
mysg.selgen = NOSELGEN;
g->param = nil;
enqueue(&c->sendq, &mysg);
- runtime·park(runtime·unlock, c, "chan send");
+ runtime·parkunlock(c, "chan send");
if(g->param == nil) {
runtime·lock(c);
mysg.elem = nil;
mysg.selgen = NOSELGEN;
enqueue(&c->sendq, &mysg);
- runtime·park(runtime·unlock, c, "chan send");
+ runtime·parkunlock(c, "chan send");
runtime·lock(c);
goto asynch;
mysg.selgen = NOSELGEN;
g->param = nil;
enqueue(&c->recvq, &mysg);
- runtime·park(runtime·unlock, c, "chan receive");
+ runtime·parkunlock(c, "chan receive");
if(g->param == nil) {
runtime·lock(c);
mysg.elem = nil;
mysg.selgen = NOSELGEN;
enqueue(&c->recvq, &mysg);
- runtime·park(runtime·unlock, c, "chan receive");
+ runtime·parkunlock(c, "chan receive");
runtime·lock(c);
goto asynch;
}
}
+static bool
+selparkcommit(G *gp, void *sel)
+{
+ USED(gp);
+ selunlock(sel);
+ return true;
+}
+
void
runtime·block(void)
{
}
g->param = nil;
- runtime·park((void(*)(Lock*))selunlock, (Lock*)sel, "select");
+ runtime·park(selparkcommit, sel, "select");
sellock(sel);
sg = g->param;
finq = nil;
if(fb == nil) {
fingwait = 1;
- runtime·park(runtime·unlock, &finlock, "finalizer wait");
+ runtime·parkunlock(&finlock, "finalizer wait");
continue;
}
runtime·unlock(&finlock);
// An implementation must call the following function to denote that the pd is ready.
// void runtime·netpollready(G **gpp, PollDesc *pd, int32 mode);
+// PollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
+// goroutines respectively. The semaphore can be in the following states:
+// READY - io readiness notification is pending;
+// a goroutine consumes the notification by changing the state to nil.
+// WAIT - a goroutine prepares to park on the semaphore, but not yet parked;
+// the goroutine commits to park by changing the state to G pointer,
+// or, alternatively, concurrent io notification changes the state to READY,
+// or, alternatively, concurrent timeout/close changes the state to nil.
+// G pointer - the goroutine is blocked on the semaphore;
+// io notification or timeout/close changes the state to READY or nil respectively
+// and unparks the goroutine.
+// nil - nothing of the above.
#define READY ((G*)1)
+#define WAIT ((G*)2)
struct PollDesc
{
PollDesc* link; // in pollcache, protected by pollcache.Lock
+
+ // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
+ // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
+ // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO rediness notification)
+ // proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated
+ // in a lock-free way by all operations.
Lock; // protectes the following fields
uintptr fd;
bool closing;
uintptr seq; // protects from stale timers and ready notifications
- G* rg; // G waiting for read or READY (binary semaphore)
+ G* rg; // READY, WAIT, G waiting for read or nil
Timer rt; // read deadline timer (set if rt.fv != nil)
int64 rd; // read deadline
- G* wg; // the same for writes
- Timer wt;
- int64 wd;
+ G* wg; // READY, WAIT, G waiting for write or nil
+ Timer wt; // write deadline timer
+ int64 wd; // write deadline
};
static struct
// seq is incremented when deadlines are changed or descriptor is reused.
} pollcache;
-static bool netpollblock(PollDesc*, int32);
+static bool netpollblock(PollDesc*, int32, bool);
static G* netpollunblock(PollDesc*, int32, bool);
static void deadline(int64, Eface);
static void readDeadline(int64, Eface);
}
func runtime_pollReset(pd *PollDesc, mode int) (err int) {
- runtime·lock(pd);
err = checkerr(pd, mode);
if(err)
goto ret;
else if(mode == 'w')
pd->wg = nil;
ret:
- runtime·unlock(pd);
}
func runtime_pollWait(pd *PollDesc, mode int) (err int) {
- runtime·lock(pd);
err = checkerr(pd, mode);
if(err == 0) {
#ifdef GOOS_solaris
else if(mode == 'w')
runtime·netpollarmwrite(pd->fd);
#endif
- while(!netpollblock(pd, mode)) {
+ while(!netpollblock(pd, mode, false)) {
err = checkerr(pd, mode);
if(err != 0)
break;
// Pretend it has not happened and retry.
}
}
- runtime·unlock(pd);
}
func runtime_pollWaitCanceled(pd *PollDesc, mode int) {
- runtime·lock(pd);
#ifdef GOOS_solaris
if(mode == 'r')
runtime·netpollarmread(pd->fd);
runtime·netpollarmwrite(pd->fd);
#endif
// wait for ioready, ignore closing or timeouts.
- while(!netpollblock(pd, mode))
+ while(!netpollblock(pd, mode, true))
;
- runtime·unlock(pd);
}
func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) {
}
// If we set the new deadline in the past, unblock currently pending IO if any.
rg = nil;
- wg = nil;
+ runtime·atomicstorep(&wg, nil); // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock
if(pd->rd < 0)
rg = netpollunblock(pd, 'r', false);
if(pd->wd < 0)
runtime·throw("runtime_pollUnblock: already closing");
pd->closing = true;
pd->seq++;
+ runtime·atomicstorep(&rg, nil); // full memory barrier between store to closing and read of rg/wg in netpollunblock
rg = netpollunblock(pd, 'r', false);
wg = netpollunblock(pd, 'w', false);
if(pd->rt.fv) {
G *rg, *wg;
rg = wg = nil;
- runtime·lock(pd);
if(mode == 'r' || mode == 'r'+'w')
rg = netpollunblock(pd, 'r', true);
if(mode == 'w' || mode == 'r'+'w')
wg = netpollunblock(pd, 'w', true);
- runtime·unlock(pd);
if(rg) {
rg->schedlink = *gpp;
*gpp = rg;
return 0;
}
+static bool
+blockcommit(G *gp, G **gpp)
+{
+ return runtime·casp(gpp, WAIT, gp);
+}
+
// returns true if IO is ready, or false if timedout or closed
+// waitio - wait only for completed IO, ignore errors
static bool
-netpollblock(PollDesc *pd, int32 mode)
+netpollblock(PollDesc *pd, int32 mode, bool waitio)
{
- G **gpp;
+ G **gpp, *old;
gpp = &pd->rg;
if(mode == 'w')
gpp = &pd->wg;
- if(*gpp == READY) {
- *gpp = nil;
- return true;
+
+ // set the gpp semaphore to WAIT
+ for(;;) {
+ old = *gpp;
+ if(old == READY) {
+ *gpp = nil;
+ return true;
+ }
+ if(old != nil)
+ runtime·throw("netpollblock: double wait");
+ if(runtime·casp(gpp, nil, WAIT))
+ break;
}
- if(*gpp != nil)
- runtime·throw("netpollblock: double wait");
- *gpp = g;
- runtime·park(runtime·unlock, &pd->Lock, "IO wait");
- runtime·lock(pd);
- if(g->param)
- return true;
- return false;
+
+ // need to recheck error states after setting gpp to WAIT
+ // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
+ // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
+ if(waitio || checkerr(pd, mode) == 0)
+ runtime·park((bool(*)(G*, void*))blockcommit, gpp, "IO wait");
+ // be careful to not lose concurrent READY notification
+ old = runtime·xchgp(gpp, nil);
+ if(old > WAIT)
+ runtime·throw("netpollblock: corrupted state");
+ return old == READY;
}
static G*
netpollunblock(PollDesc *pd, int32 mode, bool ioready)
{
- G **gpp, *old;
+ G **gpp, *old, *new;
gpp = &pd->rg;
if(mode == 'w')
gpp = &pd->wg;
- if(*gpp == READY)
- return nil;
- if(*gpp == nil) {
- // Only set READY for ioready. runtime_pollWait
- // will check for timeout/cancel before waiting.
+
+ for(;;) {
+ old = *gpp;
+ if(old == READY)
+ return nil;
+ if(old == nil && !ioready) {
+ // Only set READY for ioready. runtime_pollWait
+ // will check for timeout/cancel before waiting.
+ return nil;
+ }
+ new = nil;
if(ioready)
- *gpp = READY;
- return nil;
+ new = READY;
+ if(runtime·casp(gpp, old, new))
+ break;
}
- old = *gpp;
- // pass unblock reason onto blocked g
- old->param = (void*)ioready;
- *gpp = nil;
- return old;
+ if(old > WAIT)
+ return old; // must be G*
+ return nil;
}
static void
if(pd->rd <= 0 || pd->rt.fv == nil)
runtime·throw("deadlineimpl: inconsistent read deadline");
pd->rd = -1;
- pd->rt.fv = nil;
+ runtime·atomicstorep(&pd->rt.fv, nil); // full memory barrier between store to rd and load of rg in netpollunblock
rg = netpollunblock(pd, 'r', false);
}
if(write) {
if(pd->wd <= 0 || (pd->wt.fv == nil && !read))
runtime·throw("deadlineimpl: inconsistent write deadline");
pd->wd = -1;
- pd->wt.fv = nil;
+ runtime·atomicstorep(&pd->wt.fv, nil); // full memory barrier between store to wd and load of wg in netpollunblock
wg = netpollunblock(pd, 'w', false);
}
runtime·unlock(pd);
execute(gp);
}
-// Puts the current goroutine into a waiting state and unlocks the lock.
-// The goroutine can be made runnable again by calling runtime·ready(gp).
+// Puts the current goroutine into a waiting state and calls unlockf.
+// If unlockf returns false, the goroutine is resumed.
void
-runtime·park(void(*unlockf)(Lock*), Lock *lock, int8 *reason)
+runtime·park(bool(*unlockf)(G*, void*), void *lock, int8 *reason)
{
m->waitlock = lock;
m->waitunlockf = unlockf;
runtime·mcall(park0);
}
+static bool
+parkunlock(G *gp, void *lock)
+{
+ USED(gp);
+ runtime·unlock(lock);
+ return true;
+}
+
+// Puts the current goroutine into a waiting state and unlocks the lock.
+// The goroutine can be made runnable again by calling runtime·ready(gp).
+void
+runtime·parkunlock(Lock *lock, int8 *reason)
+{
+ runtime·park(parkunlock, lock, reason);
+}
+
// runtime·park continuation on g0.
static void
park0(G *gp)
{
+ bool ok;
+
gp->status = Gwaiting;
gp->m = nil;
m->curg = nil;
if(m->waitunlockf) {
- m->waitunlockf(m->waitlock);
+ ok = m->waitunlockf(gp, m->waitlock);
m->waitunlockf = nil;
m->waitlock = nil;
+ if(!ok) {
+ gp->status = Grunnable;
+ execute(gp); // Schedule it back, never returns.
+ }
}
if(m->lockedg) {
stoplockedm();
GCStats gcstats;
bool racecall;
bool needextram;
- void (*waitunlockf)(Lock*);
+ bool (*waitunlockf)(G*, void*);
void* waitlock;
uintptr settype_buf[1024];
int32 runtime·write(int32, void*, int32);
int32 runtime·close(int32);
int32 runtime·mincore(void*, uintptr, byte*);
-bool runtime·cas(uint32*, uint32, uint32);
-bool runtime·cas64(uint64*, uint64, uint64);
-bool runtime·casp(void**, void*, void*);
-// Don't confuse with XADD x86 instruction,
-// this one is actually 'addx', that is, add-and-fetch.
-uint32 runtime·xadd(uint32 volatile*, int32);
-uint64 runtime·xadd64(uint64 volatile*, int64);
-uint32 runtime·xchg(uint32 volatile*, uint32);
-uint64 runtime·xchg64(uint64 volatile*, uint64);
-uint32 runtime·atomicload(uint32 volatile*);
-void runtime·atomicstore(uint32 volatile*, uint32);
-void runtime·atomicstore64(uint64 volatile*, uint64);
-uint64 runtime·atomicload64(uint64 volatile*);
-void* runtime·atomicloadp(void* volatile*);
-void runtime·atomicstorep(void* volatile*, void*);
void runtime·jmpdefer(FuncVal*, void*);
void runtime·exit1(int32);
void runtime·ready(G*);
void runtime·rewindmorestack(Gobuf*);
int32 runtime·timediv(int64, int32, int32*);
-void runtime·setmg(M*, G*);
-void runtime·newextram(void);
+// atomic operations
+bool runtime·cas(uint32*, uint32, uint32);
+bool runtime·cas64(uint64*, uint64, uint64);
+bool runtime·casp(void**, void*, void*);
+// Don't confuse with XADD x86 instruction,
+// this one is actually 'addx', that is, add-and-fetch.
+uint32 runtime·xadd(uint32 volatile*, int32);
+uint64 runtime·xadd64(uint64 volatile*, int64);
+uint32 runtime·xchg(uint32 volatile*, uint32);
+uint64 runtime·xchg64(uint64 volatile*, uint64);
+void* runtime·xchgp(void* volatile*, void*);
+uint32 runtime·atomicload(uint32 volatile*);
+void runtime·atomicstore(uint32 volatile*, uint32);
+void runtime·atomicstore64(uint64 volatile*, uint64);
+uint64 runtime·atomicload64(uint64 volatile*);
+void* runtime·atomicloadp(void* volatile*);
+void runtime·atomicstorep(void* volatile*, void*);
+
+void runtime·setmg(M*, G*);
+void runtime·newextram(void);
void runtime·exit(int32);
void runtime·breakpoint(void);
void runtime·gosched(void);
void runtime·gosched0(G*);
void runtime·schedtrace(bool);
-void runtime·park(void(*)(Lock*), Lock*, int8*);
+void runtime·park(bool(*)(G*, void*), void*, int8*);
+void runtime·parkunlock(Lock*, int8*);
void runtime·tsleep(int64, int8*);
M* runtime·newm(void);
void runtime·goexit(void);
// Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep.
semqueue(root, addr, &s);
- runtime·park(runtime·unlock, root, "semacquire");
+ runtime·parkunlock(root, "semacquire");
if(cansemacquire(addr)) {
if(t0)
runtime·blockevent(s.releasetime - t0, 3);
else
s->tail->next = &w;
s->tail = &w;
- runtime·park(runtime·unlock, s, "semacquire");
+ runtime·parkunlock(s, "semacquire");
if(t0)
runtime·blockevent(w.releasetime - t0, 2);
}
else
s->tail->next = &w;
s->tail = &w;
- runtime·park(runtime·unlock, s, "semarelease");
+ runtime·parkunlock(s, "semarelease");
} else
runtime·unlock(s);
}
t.arg.data = g;
runtime·lock(&timers);
addtimer(&t);
- runtime·park(runtime·unlock, &timers, reason);
+ runtime·parkunlock(&timers, reason);
}
static FuncVal timerprocv = {timerproc};
if(delta < 0) {
// No timers left - put goroutine to sleep.
timers.rescheduling = true;
- runtime·park(runtime·unlock, &timers, "timer goroutine (idle)");
+ runtime·parkunlock(&timers, "timer goroutine (idle)");
continue;
}
// At least one timer pending. Sleep until then.