void runtime·bsdthread_register(void);
int32 runtime·mach_msg_trap(MachHeader*, int32, uint32, uint32, uint32, uint32, uint32);
uint32 runtime·mach_reply_port(void);
-void runtime·mach_semacquire(uint32);
+int32 runtime·mach_semacquire(uint32, int64);
uint32 runtime·mach_semcreate(void);
void runtime·mach_semdestroy(uint32);
void runtime·mach_semrelease(uint32);
*(int32*)1231 = 1231;
}
-void
-runtime·semasleep(void)
+int32
+runtime·semasleep(int64 ns)
{
- runtime·mach_semacquire(m->waitsema);
+ return runtime·mach_semacquire(m->waitsema, ns);
}
void
// Mach calls that get interrupted by Unix signals
// return this error code. We retry them.
KERN_ABORTED = 14,
+ KERN_OPERATION_TIMED_OUT = 49,
};
typedef struct Tmach_semcreateMsg Tmach_semcreateMsg;
int32 runtime·mach_semaphore_signal(uint32 sema);
int32 runtime·mach_semaphore_signal_all(uint32 sema);
-void
-runtime·mach_semacquire(uint32 sem)
+int32
+runtime·mach_semacquire(uint32 sem, int64 ns)
{
int32 r;
+ if(ns >= 0) {
+ r = runtime·mach_semaphore_timedwait(sem, ns/1000000000LL, ns%1000000000LL);
+ if(r == KERN_ABORTED || r == KERN_OPERATION_TIMED_OUT)
+ return -1;
+ if(r != 0)
+ macherror(r, "semaphore_wait");
+ return 0;
+ }
while((r = runtime·mach_semaphore_wait(sem)) != 0) {
if(r == KERN_ABORTED) // interrupted
continue;
macherror(r, "semaphore_wait");
}
+ return 0;
}
void
extern int32 runtime·sys_umtx_op(uint32*, int32, uint32, void*, void*);
// FreeBSD's umtx_op syscall is effectively the same as Linux's futex, and
-// thus the code is largely similar. See linux/thread.c for comments.
+// thus the code is largely similar. See linux/thread.c and lock_futex.c for comments.
void
-runtime·futexsleep(uint32 *addr, uint32 val)
+runtime·futexsleep(uint32 *addr, uint32 val, int64 ns)
{
int32 ret;
+ Timespec ts, *tsp;
+
+ if(ns < 0)
+ tsp = nil;
+ else {
+ ts.sec = ns / 1000000000LL;
+ ts.nsec = ns % 1000000000LL;
+ tsp = &ts;
+ }
- ret = runtime·sys_umtx_op(addr, UMTX_OP_WAIT, val, nil, nil);
+ ret = runtime·sys_umtx_op(addr, UMTX_OP_WAIT, val, nil, tsp);
if(ret >= 0 || ret == -EINTR)
return;
// Atomically,
// if(*addr == val) sleep
// Might be woken up spuriously; that's allowed.
+// Don't sleep longer than ns; ns < 0 means forever.
void
-runtime·futexsleep(uint32 *addr, uint32 val)
+runtime·futexsleep(uint32 *addr, uint32 val, int64 ns)
{
+ Timespec ts, *tsp;
+
+ if(ns < 0)
+ tsp = nil;
+ else {
+ ts.tv_sec = ns/1000000000LL;
+ ts.tv_nsec = ns%1000000000LL;
+ // Avoid overflow
+ if(ts.tv_sec > 1<<30)
+ ts.tv_sec = 1<<30;
+ tsp = &ts;
+ }
+
// Some Linux kernels have a bug where futex of
// FUTEX_WAIT returns an internal error code
// as an errno. Libpthread ignores the return value
// here, and so can we: as it says a few lines up,
// spurious wakeups are allowed.
- runtime·futex(addr, FUTEX_WAIT, val, nil, nil, 0);
+ runtime·futex(addr, FUTEX_WAIT, val, tsp, nil, 0);
}
// If any procs are sleeping on addr, wake up at most cnt.
#include "runtime.h"
+// This implementation depends on OS-specific implementations of
+//
+// runtime.futexsleep(uint32 *addr, uint32 val, int64 ns)
+// Atomically,
+// if(*addr == val) sleep
+// Might be woken up spuriously; that's allowed.
+// Don't sleep longer than ns; ns < 0 means forever.
+//
+// runtime.futexwakeup(uint32 *addr, uint32 cnt)
+// If any procs are sleeping on addr, wake up at most cnt.
+
enum
{
MUTEX_UNLOCKED = 0,
MUTEX_LOCKED = 1,
MUTEX_SLEEPING = 2,
-
+
ACTIVE_SPIN = 4,
ACTIVE_SPIN_CNT = 30,
PASSIVE_SPIN = 1,
};
-// Atomically,
-// if(*addr == val) sleep
-// Might be woken up spuriously; that's allowed.
-void runtime·futexsleep(uint32 *addr, uint32 val);
-
-// If any procs are sleeping on addr, wake up at most cnt.
-void runtime·futexwakeup(uint32 *addr, uint32 cnt);
-
// Possible lock states are MUTEX_UNLOCKED, MUTEX_LOCKED and MUTEX_SLEEPING.
// MUTEX_SLEEPING means that there is presumably at least one sleeping thread.
// Note that there can be spinning threads during all states - they do not
v = runtime·xchg(&l->key, MUTEX_LOCKED);
if(v == MUTEX_UNLOCKED)
return;
-
+
// wait is either MUTEX_LOCKED or MUTEX_SLEEPING
// depending on whether there is a thread sleeping
// on this mutex. If we ever change l->key from
// returning, to ensure that the sleeping thread gets
// its wakeup call.
wait = v;
-
+
// On uniprocessor's, no point spinning.
// On multiprocessors, spin for ACTIVE_SPIN attempts.
spin = 0;
if(runtime·ncpu > 1)
spin = ACTIVE_SPIN;
-
+
for(;;) {
// Try for lock, spinning.
for(i = 0; i < spin; i++) {
return;
runtime·procyield(ACTIVE_SPIN_CNT);
}
-
+
// Try for lock, rescheduling.
for(i=0; i < PASSIVE_SPIN; i++) {
while(l->key == MUTEX_UNLOCKED)
return;
runtime·osyield();
}
-
+
// Sleep.
v = runtime·xchg(&l->key, MUTEX_SLEEPING);
if(v == MUTEX_UNLOCKED)
return;
wait = MUTEX_SLEEPING;
- runtime·futexsleep(&l->key, MUTEX_SLEEPING);
+ runtime·futexsleep(&l->key, MUTEX_SLEEPING, -1);
}
}
runtime·notesleep(Note *n)
{
while(runtime·atomicload(&n->key) == 0)
- runtime·futexsleep(&n->key, 0);
+ runtime·futexsleep(&n->key, 0, -1);
+}
+
+void
+runtime·notetsleep(Note *n, int64 ns)
+{
+ int64 deadline, now;
+
+ if(ns < 0) {
+ runtime·notesleep(n);
+ return;
+ }
+
+ if(runtime·atomicload(&n->key) != 0)
+ return;
+
+ deadline = runtime·nanotime() + ns;
+ for(;;) {
+ runtime·futexsleep(&n->key, 0, ns);
+ if(runtime·atomicload(&n->key) != 0)
+ return;
+ now = runtime·nanotime();
+ if(now >= deadline)
+ return;
+ ns = deadline - now;
+ }
}
#include "runtime.h"
+// This implementation depends on OS-specific implementations of
+//
+// uintptr runtime.semacreate(void)
+// Create a semaphore, which will be assigned to m->waitsema.
+// The zero value is treated as absence of any semaphore,
+// so be sure to return a non-zero value.
+//
+// int32 runtime.semasleep(int64 ns)
+// If ns < 0, acquire m->waitsema and return 0.
+// If ns >= 0, try to acquire m->waitsema for at most ns nanoseconds.
+// Return 0 if the semaphore was acquired, -1 if interrupted or timed out.
+//
+// int32 runtime.semawakeup(M *mp)
+// Wake up mp, which is or will soon be sleeping on mp->waitsema.
+//
+
enum
{
LOCKED = 1,
PASSIVE_SPIN = 1,
};
-// creates per-M semaphore (must not return 0)
-uintptr runtime·semacreate(void);
-// acquires per-M semaphore
-void runtime·semasleep(void);
-// releases mp's per-M semaphore
-void runtime·semawakeup(M *mp);
-
void
runtime·lock(Lock *l)
{
if(m->waitsema == 0)
m->waitsema = runtime·semacreate();
-
+
// On uniprocessor's, no point spinning.
// On multiprocessors, spin for ACTIVE_SPIN attempts.
spin = 0;
if(runtime·ncpu > 1)
spin = ACTIVE_SPIN;
-
+
for(i=0;; i++) {
v = (uintptr)runtime·atomicloadp(&l->waitm);
if((v&LOCKED) == 0) {
goto unlocked;
}
if(v&LOCKED) {
- // Wait.
- runtime·semasleep();
+ // Queued. Wait.
+ runtime·semasleep(-1);
i = 0;
}
- }
+ }
}
}
// Dequeue an M.
mp = (void*)(v&~LOCKED);
if(runtime·casp(&l->waitm, (void*)v, mp->nextwaitm)) {
- // Wake that M.
+ // Dequeued an M. Wake it.
runtime·semawakeup(mp);
break;
}
void
runtime·notewakeup(Note *n)
{
- if(runtime·casp(&n->waitm, nil, (void*)LOCKED))
- return;
- runtime·semawakeup(n->waitm);
+ M *mp;
+
+ do
+ mp = runtime·atomicloadp(&n->waitm);
+ while(!runtime·casp(&n->waitm, mp, (void*)LOCKED));
+
+ // Successfully set waitm to LOCKED.
+ // What was it before?
+ if(mp == nil) {
+ // Nothing was waiting. Done.
+ } else if(mp == (M*)LOCKED) {
+ // Two notewakeups! Not allowed.
+ runtime·throw("notewakeup - double wakeup");
+ } else {
+ // Must be the waiting m. Wake it up.
+ runtime·semawakeup(mp);
+ }
}
void
{
if(m->waitsema == 0)
m->waitsema = runtime·semacreate();
- if(runtime·casp(&n->waitm, nil, m))
- runtime·semasleep();
+ if(!runtime·casp(&n->waitm, nil, m)) { // must be LOCKED (got wakeup)
+ if(n->waitm != (void*)LOCKED)
+ runtime·throw("notesleep - waitm out of sync");
+ return;
+ }
+ // Queued. Sleep.
+ runtime·semasleep(-1);
+}
+
+void
+runtime·notetsleep(Note *n, int64 ns)
+{
+ M *mp;
+ int64 deadline, now;
+
+ if(ns < 0) {
+ runtime·notesleep(n);
+ return;
+ }
+
+ if(m->waitsema == 0)
+ m->waitsema = runtime·semacreate();
+
+ // Register for wakeup on n->waitm.
+ if(!runtime·casp(&n->waitm, nil, m)) { // must be LOCKED (got wakeup already)
+ if(n->waitm != (void*)LOCKED)
+ runtime·throw("notetsleep - waitm out of sync");
+ return;
+ }
+
+ deadline = runtime·nanotime() + ns;
+ for(;;) {
+ // Registered. Sleep.
+ if(runtime·semasleep(ns) >= 0) {
+ // Acquired semaphore, semawakeup unregistered us.
+ // Done.
+ return;
+ }
+
+ // Interrupted or timed out. Still registered. Semaphore not acquired.
+ now = runtime·nanotime();
+ if(now >= deadline)
+ break;
+
+ // Deadline hasn't arrived. Keep sleeping.
+ ns = deadline - now;
+ }
+
+ // Deadline arrived. Still registered. Semaphore not acquired.
+ // Want to give up and return, but have to unregister first,
+ // so that any notewakeup racing with the return does not
+ // try to grant us the semaphore when we don't expect it.
+ for(;;) {
+ mp = runtime·atomicloadp(&n->waitm);
+ if(mp == m) {
+ // No wakeup yet; unregister if possible.
+ if(runtime·casp(&n->waitm, mp, nil))
+ return;
+ } else if(mp == (M*)LOCKED) {
+ // Wakeup happened so semaphore is available.
+ // Grab it to avoid getting out of sync.
+ if(runtime·semasleep(-1) < 0)
+ runtime·throw("runtime: unable to acquire - semaphore out of sync");
+ return;
+ } else {
+ runtime·throw("runtime: unexpected waitm - semaphore out of sync");
+ }
+ }
}
return 1;
}
-void
-runtime·semasleep(void)
+int32
+runtime·semasleep(int64 ns)
{
-retry:
+ Timespec ts;
+
// spin-mutex lock
while(runtime·xchg(&m->waitsemalock, 1))
runtime·osyield();
- if(m->waitsemacount == 0) {
- // the function unlocks the spinlock
- runtime·thrsleep(&m->waitsemacount, 0, nil, &m->waitsemalock);
- goto retry;
+
+ for(;;) {
+ // lock held
+ if(m->waitsemacount == 0) {
+ // sleep until semaphore != 0 or timeout.
+ // thrsleep unlocks m->waitsemalock.
+ if(ns < 0)
+ runtime·thrsleep(&m->waitsemacount, 0, nil, &m->waitsemalock);
+ else {
+ ts.tv_sec = ns/1000000000LL;
+ ts.tv_nsec = ns%1000000000LL;
+ runtime·thrsleep(&m->waitsemacount, CLOCK_REALTIME, &ts, &m->waitsemalock);
+ }
+ // reacquire lock
+ while(runtime·xchg(&m->waitsemalock, 1))
+ runtime·osyield();
+ }
+
+ // lock held (again)
+ if(m->waitsemacount != 0) {
+ // semaphore is available.
+ m->waitsemacount--;
+ // spin-mutex unlock
+ runtime·atomicstore(&m->waitsemalock, 0);
+ return 0; // semaphore acquired
+ }
+
+ // semaphore not available.
+ // if there is a timeout, stop now.
+ // otherwise keep trying.
+ if(ns >= 0)
+ break;
}
- m->waitsemacount--;
+
+ // lock held but giving up
// spin-mutex unlock
runtime·atomicstore(&m->waitsemalock, 0);
+ return -1;
}
void
uint8 tmp[16];
uint8 *p, *q;
int32 pid;
-
+
runtime·memclr(buf, sizeof buf);
runtime·memclr(tmp, sizeof tmp);
pid = _tos->pid;
for(q--; q >= tmp;)
*p++ = *q--;
runtime·memmove((void*)p, (void*)"/notepg", 7);
-
+
/* post interrupt note */
fd = runtime·open(buf, OWRITE);
runtime·write(fd, "interrupt", 9);
if(0){
runtime·printf("newosproc stk=%p m=%p g=%p fn=%p rfork=%p id=%d/%d ostk=%p\n",
stk, m, g, fn, runtime·rfork, m->id, m->tls[0], &m);
- }
-
+ }
+
if(runtime·rfork(RFPROC|RFMEM|RFNOWAIT, stk, m, g, fn) < 0)
runtime·throw("newosproc: rfork failed");
}
return 1;
}
-void
-runtime·semasleep(void)
+int32
+runtime·semasleep(int64 ns)
{
+ int32 ret;
+ int32 ms;
+
+ if(ns >= 0) {
+ // TODO: Plan 9 needs a new system call, tsemacquire.
+ // The kernel implementation is the same as semacquire
+ // except with a tsleep and check for timeout.
+ // It would be great if the implementation returned the
+ // value that was added to the semaphore, so that on
+ // timeout the return value would be 0, on success 1.
+ // Then the error string does not have to be parsed
+ // to detect timeout.
+ //
+ // If a negative time indicates no timeout, then
+ // semacquire can be implemented (in the kernel)
+ // as tsemacquire(p, v, -1).
+ runtime·throw("semasleep: timed sleep not implemented on Plan 9");
+
+ /*
+ if(ns < 0)
+ ms = -1;
+ else if(ns/1000 > 0x7fffffffll)
+ ms = 0x7fffffff;
+ else
+ ms = ns/1000;
+ ret = runtime·plan9_tsemacquire(&m->waitsemacount, 1, ms);
+ if(ret == 1)
+ return 0; // success
+ return -1; // timeout or interrupted
+ */
+ }
+
while(runtime·plan9_semacquire(&m->waitsemacount, 1) < 0) {
/* interrupted; try again */
}
+ return 0; // success
}
void
static void schedule(G*);
static void acquireproc(void);
static void releaseproc(void);
-static M *startm(void);
typedef struct Sched Sched;
volatile uint32 atomic; // atomic scheduling word (see below)
int32 profilehz; // cpu profiling rate
-
+
bool init; // running initialization
bool lockmain; // init called runtime.LockOSThread
// but m is not running a specific goroutine,
// so set the helpgc flag as a signal to m's
// first schedule(nil) to mcpu-- and grunning--.
- m = startm();
+ m = runtime·newm();
m->helpgc = 1;
runtime·sched.grunning++;
}
// Find the m that will run gp.
if((mp = mget(gp)) == nil)
- mp = startm();
+ mp = runtime·newm();
mnextg(mp, gp);
}
}
// Create a new m. It will start off with a call to runtime·mstart.
-static M*
-startm(void)
+M*
+runtime·newm(void)
{
M *m;
typedef struct Complex64 Complex64;
typedef struct Complex128 Complex128;
typedef struct WinCall WinCall;
+typedef struct Timers Timers;
+typedef struct Timer Timer;
/*
* per-cpu declaration.
uintptr waitsema; // semaphore for parking on locks
uint32 waitsemacount;
uint32 waitsemalock;
-
+
#ifdef __WINDOWS__
void* thread; // thread handle
#endif
};
#endif
+struct Timers
+{
+ Lock;
+ G *timerproc;
+ bool sleeping;
+ bool rescheduling;
+ Note waitnote;
+ Timer **t;
+ int32 len;
+ int32 cap;
+};
+
+// Package time knows the layout of this structure.
+// If this struct changes, adjust ../time/sleep.go:/runtimeTimer.
+struct Timer
+{
+ int32 i; // heap index
+
+ // Timer wakes up at when, and then at when+period, ... (period > 0 only)
+ // each time calling f(now, arg) in the timer goroutine, so f must be
+ // a well-behaved function and not block.
+ int64 when;
+ int64 period;
+ void (*f)(int64, Eface);
+ Eface arg;
+};
+
/*
* defined macros
* you need super-gopher-guru privilege
void runtime·exit(int32);
void runtime·breakpoint(void);
void runtime·gosched(void);
+void runtime·tsleep(int64);
+M* runtime·newm(void);
void runtime·goexit(void);
void runtime·asmcgocall(void (*fn)(void*), void*);
void runtime·entersyscall(void);
* subsequent noteclear must be called only after
* previous notesleep has returned, e.g. it's disallowed
* to call noteclear straight after notewakeup.
+ *
+ * notetsleep is like notesleep but wakes up after
+ * a given number of nanoseconds even if the event
+ * has not yet happened. if a goroutine uses notetsleep to
+ * wake up early, it must wait to call noteclear until it
+ * can be sure that no other goroutine is calling
+ * notewakeup.
*/
void runtime·noteclear(Note*);
void runtime·notesleep(Note*);
void runtime·notewakeup(Note*);
+void runtime·notetsleep(Note*, int64);
+
+/*
+ * low-level synchronization for implementing the above
+ */
+uintptr runtime·semacreate(void);
+int32 runtime·semasleep(int64);
+void runtime·semawakeup(M*);
+// or
+void runtime·futexsleep(uint32*, uint32, int64);
+void runtime·futexwakeup(uint32*, uint32);
/*
* This is consistent across Linux and BSD.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-// Runtime implementations to help package time.
+// Time-related runtime and pieces of package time.
package time
#include "runtime.h"
+#include "defs.h"
+#include "os.h"
+#include "arch.h"
+#include "malloc.h"
+static Timers timers;
+static void addtimer(Timer*);
+static bool deltimer(Timer*);
+
+// Package time APIs.
+// Godoc uses the comments in package time, not these.
+
+// Nanoseconds returns the current time in nanoseconds.
func Nanoseconds() (ret int64) {
ret = runtime·nanotime();
}
+
+// Sleep puts the current goroutine to sleep for at least ns nanoseconds.
+func Sleep(ns int64) {
+ g->status = Gwaiting;
+ g->waitreason = "sleep";
+ runtime·tsleep(ns);
+}
+
+// startTimer adds t to the timer heap.
+func startTimer(t *Timer) {
+ addtimer(t);
+}
+
+// stopTimer removes t from the timer heap if it is there.
+// It returns true if t was removed, false if t wasn't even there.
+func stopTimer(t *Timer) (stopped bool) {
+ stopped = deltimer(t);
+}
+
+// C runtime.
+
+static void timerproc(void);
+static void siftup(int32);
+static void siftdown(int32);
+
+// Ready the goroutine e.data.
+static void
+ready(int64 now, Eface e)
+{
+ USED(now);
+
+ runtime·ready(e.data);
+}
+
+// Put the current goroutine to sleep for ns nanoseconds.
+// The caller must have set g->status and g->waitreason.
+void
+runtime·tsleep(int64 ns)
+{
+ Timer t;
+
+ if(ns <= 0)
+ return;
+
+ t.when = runtime·nanotime() + ns;
+ t.period = 0;
+ t.f = ready;
+ t.arg.data = g;
+ addtimer(&t);
+ runtime·gosched();
+}
+
+// Add a timer to the heap and start or kick the timer proc
+// if the new timer is earlier than any of the others.
+static void
+addtimer(Timer *t)
+{
+ int32 n;
+ Timer **nt;
+
+ runtime·lock(&timers);
+ if(timers.len >= timers.cap) {
+ // Grow slice.
+ n = 16;
+ if(n <= timers.cap)
+ n = timers.cap*3 / 2;
+ nt = runtime·malloc(n*sizeof nt[0]);
+ runtime·memmove(nt, timers.t, timers.len*sizeof nt[0]);
+ runtime·free(timers.t);
+ timers.t = nt;
+ timers.cap = n;
+ }
+ t->i = timers.len++;
+ timers.t[t->i] = t;
+ siftup(t->i);
+ if(t->i == 0) {
+ // siftup moved to top: new earliest deadline.
+ if(timers.sleeping) {
+ timers.sleeping = false;
+ runtime·notewakeup(&timers.waitnote);
+ }
+ if(timers.rescheduling) {
+ timers.rescheduling = false;
+ runtime·ready(timers.timerproc);
+ }
+ }
+ if(timers.timerproc == nil)
+ timers.timerproc = runtime·newproc1((byte*)timerproc, nil, 0, 0, addtimer);
+ runtime·unlock(&timers);
+}
+
+// Delete timer t from the heap.
+// Do not need to update the timerproc:
+// if it wakes up early, no big deal.
+static bool
+deltimer(Timer *t)
+{
+ int32 i;
+
+ runtime·lock(&timers);
+
+ // t may not be registered anymore and may have
+ // a bogus i (typically 0, if generated by Go).
+ // Verify it before proceeding.
+ i = t->i;
+ if(i < 0 || i >= timers.len || timers.t[i] != t) {
+ runtime·unlock(&timers);
+ return false;
+ }
+
+ timers.t[i] = timers.t[--timers.len];
+ siftup(i);
+ siftdown(i);
+ runtime·unlock(&timers);
+ return true;
+}
+
+// Timerproc runs the time-driven events.
+// It sleeps until the next event in the timers heap.
+// If addtimer inserts a new earlier event, addtimer
+// wakes timerproc early.
+static void
+timerproc(void)
+{
+ int64 delta, now;
+ Timer *t;
+
+ for(;;) {
+ runtime·lock(&timers);
+ now = runtime·nanotime();
+ for(;;) {
+ if(timers.len == 0) {
+ delta = -1;
+ break;
+ }
+ t = timers.t[0];
+ delta = t->when - now;
+ if(delta > 0)
+ break;
+ if(t->period > 0) {
+ // leave in heap but adjust next time to fire
+ t->when += t->period * (1 + -delta/t->period);
+ siftdown(0);
+ } else {
+ // remove from heap
+ timers.t[0] = timers.t[--timers.len];
+ timers.t[0]->i = 0;
+ siftdown(0);
+ t->i = -1; // mark as removed
+ }
+ t->f(now, t->arg);
+ }
+ if(delta < 0) {
+ // No timers left - put goroutine to sleep.
+ timers.rescheduling = true;
+ g->status = Gwaiting;
+ g->waitreason = "timer goroutine (idle)";
+ runtime·unlock(&timers);
+ runtime·gosched();
+ continue;
+ }
+ // At least one timer pending. Sleep until then.
+ timers.sleeping = true;
+ runtime·noteclear(&timers.waitnote);
+ runtime·unlock(&timers);
+ runtime·entersyscall();
+ runtime·notetsleep(&timers.waitnote, delta);
+ runtime·exitsyscall();
+ }
+}
+
+// heap maintenance algorithms.
+
+static void
+siftup(int32 i)
+{
+ int32 p;
+ Timer **t, *tmp;
+
+ t = timers.t;
+ while(i > 0) {
+ p = (i-1)/2; // parent
+ if(t[i]->when >= t[p]->when)
+ break;
+ tmp = t[i];
+ t[i] = t[p];
+ t[p] = tmp;
+ t[i]->i = i;
+ t[p]->i = p;
+ i = p;
+ }
+}
+
+static void
+siftdown(int32 i)
+{
+ int32 c, len;
+ Timer **t, *tmp;
+
+ t = timers.t;
+ len = timers.len;
+ for(;;) {
+ c = i*2 + 1; // left child
+ if(c >= len) {
+ break;
+ }
+ if(c+1 < len && t[c+1]->when < t[c]->when)
+ c++;
+ if(t[c]->when >= t[i]->when)
+ break;
+ tmp = t[i];
+ t[i] = t[c];
+ t[c] = tmp;
+ t[i]->i = i;
+ t[c]->i = c;
+ i = c;
+ }
+}
runtime·stdcall(runtime·Sleep, 1, (uintptr)us);
}
-void
-runtime·semasleep(void)
+#define INFINITE ((uintptr)0xFFFFFFFF)
+
+int32
+runtime·semasleep(int64 ns)
{
- runtime·stdcall(runtime·WaitForSingleObject, 2, m->waitsema, (uintptr)-1);
+ uintptr ms;
+
+ if(ns < 0)
+ ms = INFINITE;
+ else if(ns/1000000 > 0x7fffffffLL)
+ ms = 0x7fffffff;
+ else {
+ ms = ns/1000000;
+ if(ms == 0)
+ ms = 1;
+ }
+ if(runtime·stdcall(runtime·WaitForSingleObject, 2, m->waitsema, ms) != 0)
+ return -1; // timeout
+ return 0;
}
void
int64 filetime;
runtime·stdcall(runtime·GetSystemTimeAsFileTime, 1, &filetime);
-
+
// Filetime is 100s of nanoseconds since January 1, 1601.
// Convert to nanoseconds since January 1, 1970.
return (filetime - 116444736000000000LL) * 100LL;
package time
-import (
- "container/heap"
- "sync"
-)
+// Interface to timers implemented in package runtime.
+// Must be in sync with ../runtime/runtime.h:/^struct.Timer$
+type runtimeTimer struct {
+ i int32
+ when int64
+ period int64
+ f func(int64, interface{})
+ arg interface{}
+}
+
+func startTimer(*runtimeTimer)
+func stopTimer(*runtimeTimer) bool
// The Timer type represents a single event.
-// When the Timer expires, the current time will be sent on C
-// unless the Timer represents an AfterFunc event.
+// When the Timer expires, the current time will be sent on C,
+// unless the Timer was created by AfterFunc.
type Timer struct {
C <-chan int64
- t int64 // The absolute time that the event should fire.
- f func(int64) // The function to call when the event fires.
- i int // The event's index inside eventHeap.
+ r runtimeTimer
}
-type timerHeap []*Timer
-
-// forever is the absolute time (in ns) of an event that is forever away.
-const forever = 1 << 62
-
-// maxSleepTime is the maximum length of time that a sleeper
-// sleeps for before checking if it is defunct.
-const maxSleepTime = 1e9
-
-var (
- // timerMutex guards the variables inside this var group.
- timerMutex sync.Mutex
-
- // timers holds a binary heap of pending events, terminated with a sentinel.
- timers timerHeap
-
- // currentSleeper is an ever-incrementing counter which represents
- // the current sleeper. It allows older sleepers to detect that they are
- // defunct and exit.
- currentSleeper int64
-)
-
-func init() {
- timers.Push(&Timer{t: forever}) // sentinel
+// Stop prevents the Timer from firing.
+// It returns true if the call stops the timer, false if the timer has already
+// expired or stopped.
+func (t *Timer) Stop() (ok bool) {
+ return stopTimer(&t.r)
}
// NewTimer creates a new Timer that will send
// the current time on its channel after at least ns nanoseconds.
func NewTimer(ns int64) *Timer {
c := make(chan int64, 1)
- e := after(ns, func(t int64) { c <- t })
- e.C = c
- return e
+ t := &Timer{
+ C: c,
+ r: runtimeTimer{
+ when: Nanoseconds() + ns,
+ f: sendTime,
+ arg: c,
+ },
+ }
+ startTimer(&t.r)
+ return t
+}
+
+func sendTime(now int64, c interface{}) {
+ // Non-blocking send of time on c.
+ // Used in NewTimer, it cannot block anyway (buffer).
+ // Used in NewTicker, dropping sends on the floor is
+ // the desired behavior when the reader gets behind,
+ // because the sends are periodic.
+ select {
+ case c.(chan int64) <- now:
+ default:
+ }
}
// After waits at least ns nanoseconds before sending the current time
// in its own goroutine. It returns a Timer that can
// be used to cancel the call using its Stop method.
func AfterFunc(ns int64, f func()) *Timer {
- return after(ns, func(_ int64) {
- go f()
- })
-}
-
-// Stop prevents the Timer from firing.
-// It returns true if the call stops the timer, false if the timer has already
-// expired or stopped.
-func (e *Timer) Stop() (ok bool) {
- timerMutex.Lock()
- // Avoid removing the first event in the queue so that
- // we don't start a new sleeper unnecessarily.
- if e.i > 0 {
- heap.Remove(timers, e.i)
- }
- ok = e.f != nil
- e.f = nil
- timerMutex.Unlock()
- return
-}
-
-// after is the implementation of After and AfterFunc.
-// When the current time is after ns, it calls f with the current time.
-// It assumes that f will not block.
-func after(ns int64, f func(int64)) (e *Timer) {
- now := Nanoseconds()
- t := now + ns
- if ns > 0 && t < now {
- panic("time: time overflow")
- }
- timerMutex.Lock()
- t0 := timers[0].t
- e = &Timer{nil, t, f, -1}
- heap.Push(timers, e)
- // Start a new sleeper if the new event is before
- // the first event in the queue. If the length of time
- // until the new event is at least maxSleepTime,
- // then we're guaranteed that the sleeper will wake up
- // in time to service it, so no new sleeper is needed.
- if t0 > t && (t0 == forever || ns < maxSleepTime) {
- currentSleeper++
- go sleeper(currentSleeper)
- }
- timerMutex.Unlock()
- return
-}
-
-// sleeper continually looks at the earliest event in the queue, waits until it happens,
-// then removes any events in the queue that are due. It stops when the queue
-// is empty or when another sleeper has been started.
-func sleeper(sleeperId int64) {
- timerMutex.Lock()
- e := timers[0]
- t := Nanoseconds()
- for e.t != forever {
- if dt := e.t - t; dt > 0 {
- if dt > maxSleepTime {
- dt = maxSleepTime
- }
- timerMutex.Unlock()
- sysSleep(dt)
- timerMutex.Lock()
- if currentSleeper != sleeperId {
- // Another sleeper has been started, making this one redundant.
- break
- }
- }
- e = timers[0]
- t = Nanoseconds()
- for t >= e.t {
- if e.f != nil {
- e.f(t)
- e.f = nil
- }
- heap.Pop(timers)
- e = timers[0]
- }
+ t := &Timer{
+ r: runtimeTimer{
+ when: Nanoseconds() + ns,
+ f: goFunc,
+ arg: f,
+ },
}
- timerMutex.Unlock()
-}
-
-func (timerHeap) Len() int {
- return len(timers)
-}
-
-func (timerHeap) Less(i, j int) bool {
- return timers[i].t < timers[j].t
-}
-
-func (timerHeap) Swap(i, j int) {
- timers[i], timers[j] = timers[j], timers[i]
- timers[i].i = i
- timers[j].i = j
-}
-
-func (timerHeap) Push(x interface{}) {
- e := x.(*Timer)
- e.i = len(timers)
- timers = append(timers, e)
+ startTimer(&t.r)
+ return t
}
-func (timerHeap) Pop() interface{} {
- // TODO: possibly shrink array.
- n := len(timers) - 1
- e := timers[n]
- timers[n] = nil
- timers = timers[0:n]
- e.i = -1
- return e
+func goFunc(now int64, arg interface{}) {
+ go arg.(func())()
}
func Nanoseconds() int64
// Sleep pauses the current goroutine for at least ns nanoseconds.
-// Higher resolution sleeping may be provided by syscall.Nanosleep
-// on some operating systems.
-func Sleep(ns int64) error {
- _, err := sleep(Nanoseconds(), ns)
- return err
-}
-
-// sleep takes the current time and a duration,
-// pauses for at least ns nanoseconds, and
-// returns the current time and an error.
-func sleep(t, ns int64) (int64, error) {
- // TODO(cw): use monotonic-time once it's available
- end := t + ns
- for t < end {
- err := sysSleep(end - t)
- if err != nil {
- return 0, err
- }
- t = Nanoseconds()
- }
- return t, nil
-}
+func Sleep(ns int64)
package time
-import (
- "errors"
- "sync"
-)
+import "errors"
// A Ticker holds a synchronous channel that delivers `ticks' of a clock
// at intervals.
type Ticker struct {
- C <-chan int64 // The channel on which the ticks are delivered.
- c chan<- int64 // The same channel, but the end we use.
- ns int64
- shutdown chan bool // Buffered channel used to signal shutdown.
- nextTick int64
- next *Ticker
+ C <-chan int64 // The channel on which the ticks are delivered.
+ r runtimeTimer
}
-// Stop turns off a ticker. After Stop, no more ticks will be sent.
-func (t *Ticker) Stop() {
- select {
- case t.shutdown <- true:
- // ok
- default:
- // Stop in progress already
- }
-}
-
-// Tick is a convenience wrapper for NewTicker providing access to the ticking
-// channel only. Useful for clients that have no need to shut down the ticker.
-func Tick(ns int64) <-chan int64 {
- if ns <= 0 {
- return nil
- }
- return NewTicker(ns).C
-}
-
-type alarmer struct {
- wakeUp chan bool // wakeup signals sent/received here
- wakeMeAt chan int64
- wakeTime int64
-}
-
-// Set alarm to go off at time ns, if not already set earlier.
-func (a *alarmer) set(ns int64) {
- switch {
- case a.wakeTime > ns:
- // Next tick we expect is too late; shut down the late runner
- // and (after fallthrough) start a new wakeLoop.
- close(a.wakeMeAt)
- fallthrough
- case a.wakeMeAt == nil:
- // There's no wakeLoop, start one.
- a.wakeMeAt = make(chan int64)
- a.wakeUp = make(chan bool, 1)
- go wakeLoop(a.wakeMeAt, a.wakeUp)
- fallthrough
- case a.wakeTime == 0:
- // Nobody else is waiting; it's just us.
- a.wakeTime = ns
- a.wakeMeAt <- ns
- default:
- // There's already someone scheduled.
- }
-}
-
-// Channel to notify tickerLoop of new Tickers being created.
-var newTicker chan *Ticker
-
-func startTickerLoop() {
- newTicker = make(chan *Ticker)
- go tickerLoop()
-}
-
-// wakeLoop delivers ticks at scheduled times, sleeping until the right moment.
-// If another, earlier Ticker is created while it sleeps, tickerLoop() will start a new
-// wakeLoop and signal that this one is done by closing the wakeMeAt channel.
-func wakeLoop(wakeMeAt chan int64, wakeUp chan bool) {
- for wakeAt := range wakeMeAt {
- Sleep(wakeAt - Nanoseconds())
- wakeUp <- true
- }
-}
-
-// A single tickerLoop serves all ticks to Tickers. It waits for two events:
-// either the creation of a new Ticker or a tick from the alarm,
-// signaling a time to wake up one or more Tickers.
-func tickerLoop() {
- // Represents the next alarm to be delivered.
- var alarm alarmer
- var now, wakeTime int64
- var tickers *Ticker
- for {
- select {
- case t := <-newTicker:
- // Add Ticker to list
- t.next = tickers
- tickers = t
- // Arrange for a new alarm if this one precedes the existing one.
- alarm.set(t.nextTick)
- case <-alarm.wakeUp:
- now = Nanoseconds()
- wakeTime = now + 1e15 // very long in the future
- var prev *Ticker = nil
- // Scan list of tickers, delivering updates to those
- // that need it and determining the next wake time.
- // TODO(r): list should be sorted in time order.
- for t := tickers; t != nil; t = t.next {
- select {
- case <-t.shutdown:
- // Ticker is done; remove it from list.
- if prev == nil {
- tickers = t.next
- } else {
- prev.next = t.next
- }
- continue
- default:
- }
- if t.nextTick <= now {
- if len(t.c) == 0 {
- // Only send if there's room. We must not block.
- // The channel is allocated with a one-element
- // buffer, which is sufficient: if he hasn't picked
- // up the last tick, no point in sending more.
- t.c <- now
- }
- t.nextTick += t.ns
- if t.nextTick <= now {
- // Still behind; advance in one big step.
- t.nextTick += (now - t.nextTick + t.ns) / t.ns * t.ns
- }
- }
- if t.nextTick < wakeTime {
- wakeTime = t.nextTick
- }
- prev = t
- }
- if tickers != nil {
- // Please send wakeup at earliest required time.
- // If there are no tickers, don't bother.
- alarm.wakeTime = wakeTime
- alarm.wakeMeAt <- wakeTime
- } else {
- alarm.wakeTime = 0
- }
- }
- }
-}
-
-var onceStartTickerLoop sync.Once
-
// NewTicker returns a new Ticker containing a channel that will
// send the time, in nanoseconds, every ns nanoseconds. It adjusts the
// intervals to make up for pauses in delivery of the ticks. The value of
if ns <= 0 {
panic(errors.New("non-positive interval for NewTicker"))
}
- c := make(chan int64, 1) // See comment on send in tickerLoop
+ // Give the channel a 1-element time buffer.
+ // If the client falls behind while reading, we drop ticks
+ // on the floor until the client catches up.
+ c := make(chan int64, 1)
t := &Ticker{
- C: c,
- c: c,
- ns: ns,
- shutdown: make(chan bool, 1),
- nextTick: Nanoseconds() + ns,
+ C: c,
+ r: runtimeTimer{
+ when: Nanoseconds() + ns,
+ period: ns,
+ f: sendTime,
+ arg: c,
+ },
}
- onceStartTickerLoop.Do(startTickerLoop)
- // must be run in background so global Tickers can be created
- go func() { newTicker <- t }()
+ startTimer(&t.r)
return t
}
+
+// Stop turns off a ticker. After Stop, no more ticks will be sent.
+func (t *Ticker) Stop() {
+ stopTimer(&t.r)
+}
+
+// Tick is a convenience wrapper for NewTicker providing access to the ticking
+// channel only. Useful for clients that have no need to shut down the ticker.
+func Tick(ns int64) <-chan int64 {
+ if ns <= 0 {
+ return nil
+ }
+ return NewTicker(ns).C
+}