Commit 3b860269 authored by Russ Cox's avatar Russ Cox

runtime: add timer support, use for package time

This looks like it is just moving some code from
time to runtime (and translating it to C), but the
runtime can do a better job managing the goroutines,
and it needs this functionality for its own maintenance
(for example, for the garbage collector to hand back
unused memory to the OS on a time delay).
Might as well have just one copy of the timer logic,
and runtime can't depend on time, so vice versa.

It also unifies Sleep, NewTicker, and NewTimer behind
one mechanism, so that there are no claims that one
is more efficient than another.  (For example, today
people recommend using time.After instead of time.Sleep
to avoid blocking an OS thread.)

Fixes #1644.
Fixes #1731.
Fixes #2190.

R=golang-dev, r, hectorchu, iant, iant, jsing, alex.brainman, dvyukov
CC=golang-dev
https://golang.org/cl/5334051
parent fbfed491
......@@ -9,7 +9,7 @@ int32 runtime·bsdthread_create(void*, M*, G*, void(*)(void));
void runtime·bsdthread_register(void);
int32 runtime·mach_msg_trap(MachHeader*, int32, uint32, uint32, uint32, uint32, uint32);
uint32 runtime·mach_reply_port(void);
void runtime·mach_semacquire(uint32);
int32 runtime·mach_semacquire(uint32, int64);
uint32 runtime·mach_semcreate(void);
void runtime·mach_semdestroy(uint32);
void runtime·mach_semrelease(uint32);
......
......@@ -17,10 +17,10 @@ unimplemented(int8 *name)
*(int32*)1231 = 1231;
}
void
runtime·semasleep(void)
int32
runtime·semasleep(int64 ns)
{
runtime·mach_semacquire(m->waitsema);
return runtime·mach_semacquire(m->waitsema, ns);
}
void
......@@ -252,6 +252,7 @@ enum
// Mach calls that get interrupted by Unix signals
// return this error code. We retry them.
KERN_ABORTED = 14,
KERN_OPERATION_TIMED_OUT = 49,
};
typedef struct Tmach_semcreateMsg Tmach_semcreateMsg;
......@@ -343,16 +344,25 @@ int32 runtime·mach_semaphore_timedwait(uint32 sema, uint32 sec, uint32 nsec);
int32 runtime·mach_semaphore_signal(uint32 sema);
int32 runtime·mach_semaphore_signal_all(uint32 sema);
void
runtime·mach_semacquire(uint32 sem)
int32
runtime·mach_semacquire(uint32 sem, int64 ns)
{
int32 r;
if(ns >= 0) {
r = runtime·mach_semaphore_timedwait(sem, ns/1000000000LL, ns%1000000000LL);
if(r == KERN_ABORTED || r == KERN_OPERATION_TIMED_OUT)
return -1;
if(r != 0)
macherror(r, "semaphore_wait");
return 0;
}
while((r = runtime·mach_semaphore_wait(sem)) != 0) {
if(r == KERN_ABORTED) // interrupted
continue;
macherror(r, "semaphore_wait");
}
return 0;
}
void
......
......@@ -10,14 +10,23 @@ extern SigTab runtime·sigtab[];
extern int32 runtime·sys_umtx_op(uint32*, int32, uint32, void*, void*);
// FreeBSD's umtx_op syscall is effectively the same as Linux's futex, and
// thus the code is largely similar. See linux/thread.c for comments.
// thus the code is largely similar. See linux/thread.c and lock_futex.c for comments.
void
runtime·futexsleep(uint32 *addr, uint32 val)
runtime·futexsleep(uint32 *addr, uint32 val, int64 ns)
{
int32 ret;
Timespec ts, *tsp;
if(ns < 0)
tsp = nil;
else {
ts.sec = ns / 1000000000LL;
ts.nsec = ns % 1000000000LL;
tsp = &ts;
}
ret = runtime·sys_umtx_op(addr, UMTX_OP_WAIT, val, nil, nil);
ret = runtime·sys_umtx_op(addr, UMTX_OP_WAIT, val, nil, tsp);
if(ret >= 0 || ret == -EINTR)
return;
......
......@@ -34,15 +34,29 @@ enum
// Atomically,
// if(*addr == val) sleep
// Might be woken up spuriously; that's allowed.
// Don't sleep longer than ns; ns < 0 means forever.
void
runtime·futexsleep(uint32 *addr, uint32 val)
runtime·futexsleep(uint32 *addr, uint32 val, int64 ns)
{
Timespec ts, *tsp;
if(ns < 0)
tsp = nil;
else {
ts.tv_sec = ns/1000000000LL;
ts.tv_nsec = ns%1000000000LL;
// Avoid overflow
if(ts.tv_sec > 1<<30)
ts.tv_sec = 1<<30;
tsp = &ts;
}
// Some Linux kernels have a bug where futex of
// FUTEX_WAIT returns an internal error code
// as an errno. Libpthread ignores the return value
// here, and so can we: as it says a few lines up,
// spurious wakeups are allowed.
runtime·futex(addr, FUTEX_WAIT, val, nil, nil, 0);
runtime·futex(addr, FUTEX_WAIT, val, tsp, nil, 0);
}
// If any procs are sleeping on addr, wake up at most cnt.
......
......@@ -4,6 +4,17 @@
#include "runtime.h"
// This implementation depends on OS-specific implementations of
//
// runtime.futexsleep(uint32 *addr, uint32 val, int64 ns)
// Atomically,
// if(*addr == val) sleep
// Might be woken up spuriously; that's allowed.
// Don't sleep longer than ns; ns < 0 means forever.
//
// runtime.futexwakeup(uint32 *addr, uint32 cnt)
// If any procs are sleeping on addr, wake up at most cnt.
enum
{
MUTEX_UNLOCKED = 0,
......@@ -15,14 +26,6 @@ enum
PASSIVE_SPIN = 1,
};
// Atomically,
// if(*addr == val) sleep
// Might be woken up spuriously; that's allowed.
void runtime·futexsleep(uint32 *addr, uint32 val);
// If any procs are sleeping on addr, wake up at most cnt.
void runtime·futexwakeup(uint32 *addr, uint32 cnt);
// Possible lock states are MUTEX_UNLOCKED, MUTEX_LOCKED and MUTEX_SLEEPING.
// MUTEX_SLEEPING means that there is presumably at least one sleeping thread.
// Note that there can be spinning threads during all states - they do not
......@@ -77,7 +80,7 @@ runtime·lock(Lock *l)
if(v == MUTEX_UNLOCKED)
return;
wait = MUTEX_SLEEPING;
runtime·futexsleep(&l->key, MUTEX_SLEEPING);
runtime·futexsleep(&l->key, MUTEX_SLEEPING, -1);
}
}
......@@ -114,5 +117,30 @@ void
runtime·notesleep(Note *n)
{
while(runtime·atomicload(&n->key) == 0)
runtime·futexsleep(&n->key, 0);
runtime·futexsleep(&n->key, 0, -1);
}
void
runtime·notetsleep(Note *n, int64 ns)
{
int64 deadline, now;
if(ns < 0) {
runtime·notesleep(n);
return;
}
if(runtime·atomicload(&n->key) != 0)
return;
deadline = runtime·nanotime() + ns;
for(;;) {
runtime·futexsleep(&n->key, 0, ns);
if(runtime·atomicload(&n->key) != 0)
return;
now = runtime·nanotime();
if(now >= deadline)
return;
ns = deadline - now;
}
}
......@@ -4,6 +4,22 @@
#include "runtime.h"
// This implementation depends on OS-specific implementations of
//
// uintptr runtime.semacreate(void)
// Create a semaphore, which will be assigned to m->waitsema.
// The zero value is treated as absence of any semaphore,
// so be sure to return a non-zero value.
//
// int32 runtime.semasleep(int64 ns)
// If ns < 0, acquire m->waitsema and return 0.
// If ns >= 0, try to acquire m->waitsema for at most ns nanoseconds.
// Return 0 if the semaphore was acquired, -1 if interrupted or timed out.
//
// int32 runtime.semawakeup(M *mp)
// Wake up mp, which is or will soon be sleeping on mp->waitsema.
//
enum
{
LOCKED = 1,
......@@ -13,13 +29,6 @@ enum
PASSIVE_SPIN = 1,
};
// creates per-M semaphore (must not return 0)
uintptr runtime·semacreate(void);
// acquires per-M semaphore
void runtime·semasleep(void);
// releases mp's per-M semaphore
void runtime·semawakeup(M *mp);
void
runtime·lock(Lock *l)
{
......@@ -68,8 +77,8 @@ unlocked:
goto unlocked;
}
if(v&LOCKED) {
// Wait.
runtime·semasleep();
// Queued. Wait.
runtime·semasleep(-1);
i = 0;
}
}
......@@ -95,7 +104,7 @@ runtime·unlock(Lock *l)
// Dequeue an M.
mp = (void*)(v&~LOCKED);
if(runtime·casp(&l->waitm, (void*)v, mp->nextwaitm)) {
// Wake that M.
// Dequeued an M. Wake it.
runtime·semawakeup(mp);
break;
}
......@@ -113,9 +122,23 @@ runtime·noteclear(Note *n)
void
runtime·notewakeup(Note *n)
{
if(runtime·casp(&n->waitm, nil, (void*)LOCKED))
return;
runtime·semawakeup(n->waitm);
M *mp;
do
mp = runtime·atomicloadp(&n->waitm);
while(!runtime·casp(&n->waitm, mp, (void*)LOCKED));
// Successfully set waitm to LOCKED.
// What was it before?
if(mp == nil) {
// Nothing was waiting. Done.
} else if(mp == (M*)LOCKED) {
// Two notewakeups! Not allowed.
runtime·throw("notewakeup - double wakeup");
} else {
// Must be the waiting m. Wake it up.
runtime·semawakeup(mp);
}
}
void
......@@ -123,6 +146,72 @@ runtime·notesleep(Note *n)
{
if(m->waitsema == 0)
m->waitsema = runtime·semacreate();
if(runtime·casp(&n->waitm, nil, m))
runtime·semasleep();
if(!runtime·casp(&n->waitm, nil, m)) { // must be LOCKED (got wakeup)
if(n->waitm != (void*)LOCKED)
runtime·throw("notesleep - waitm out of sync");
return;
}
// Queued. Sleep.
runtime·semasleep(-1);
}
void
runtime·notetsleep(Note *n, int64 ns)
{
M *mp;
int64 deadline, now;
if(ns < 0) {
runtime·notesleep(n);
return;
}
if(m->waitsema == 0)
m->waitsema = runtime·semacreate();
// Register for wakeup on n->waitm.
if(!runtime·casp(&n->waitm, nil, m)) { // must be LOCKED (got wakeup already)
if(n->waitm != (void*)LOCKED)
runtime·throw("notetsleep - waitm out of sync");
return;
}
deadline = runtime·nanotime() + ns;
for(;;) {
// Registered. Sleep.
if(runtime·semasleep(ns) >= 0) {
// Acquired semaphore, semawakeup unregistered us.
// Done.
return;
}
// Interrupted or timed out. Still registered. Semaphore not acquired.
now = runtime·nanotime();
if(now >= deadline)
break;
// Deadline hasn't arrived. Keep sleeping.
ns = deadline - now;
}
// Deadline arrived. Still registered. Semaphore not acquired.
// Want to give up and return, but have to unregister first,
// so that any notewakeup racing with the return does not
// try to grant us the semaphore when we don't expect it.
for(;;) {
mp = runtime·atomicloadp(&n->waitm);
if(mp == m) {
// No wakeup yet; unregister if possible.
if(runtime·casp(&n->waitm, mp, nil))
return;
} else if(mp == (M*)LOCKED) {
// Wakeup happened so semaphore is available.
// Grab it to avoid getting out of sync.
if(runtime·semasleep(-1) < 0)
runtime·throw("runtime: unable to acquire - semaphore out of sync");
return;
} else {
runtime·throw("runtime: unexpected waitm - semaphore out of sync");
}
}
}
......@@ -62,21 +62,52 @@ runtime·semacreate(void)
return 1;
}
void
runtime·semasleep(void)
int32
runtime·semasleep(int64 ns)
{
retry:
Timespec ts;
// spin-mutex lock
while(runtime·xchg(&m->waitsemalock, 1))
runtime·osyield();
for(;;) {
// lock held
if(m->waitsemacount == 0) {
// the function unlocks the spinlock
// sleep until semaphore != 0 or timeout.
// thrsleep unlocks m->waitsemalock.
if(ns < 0)
runtime·thrsleep(&m->waitsemacount, 0, nil, &m->waitsemalock);
goto retry;
else {
ts.tv_sec = ns/1000000000LL;
ts.tv_nsec = ns%1000000000LL;
runtime·thrsleep(&m->waitsemacount, CLOCK_REALTIME, &ts, &m->waitsemalock);
}
// reacquire lock
while(runtime·xchg(&m->waitsemalock, 1))
runtime·osyield();
}
// lock held (again)
if(m->waitsemacount != 0) {
// semaphore is available.
m->waitsemacount--;
// spin-mutex unlock
runtime·atomicstore(&m->waitsemalock, 0);
return 0; // semaphore acquired
}
// semaphore not available.
// if there is a timeout, stop now.
// otherwise keep trying.
if(ns >= 0)
break;
}
// lock held but giving up
// spin-mutex unlock
runtime·atomicstore(&m->waitsemalock, 0);
return -1;
}
void
......
......@@ -120,12 +120,45 @@ runtime·semacreate(void)
return 1;
}
void
runtime·semasleep(void)
int32
runtime·semasleep(int64 ns)
{
int32 ret;
int32 ms;
if(ns >= 0) {
// TODO: Plan 9 needs a new system call, tsemacquire.
// The kernel implementation is the same as semacquire
// except with a tsleep and check for timeout.
// It would be great if the implementation returned the
// value that was added to the semaphore, so that on
// timeout the return value would be 0, on success 1.
// Then the error string does not have to be parsed
// to detect timeout.
//
// If a negative time indicates no timeout, then
// semacquire can be implemented (in the kernel)
// as tsemacquire(p, v, -1).
runtime·throw("semasleep: timed sleep not implemented on Plan 9");
/*
if(ns < 0)
ms = -1;
else if(ns/1000 > 0x7fffffffll)
ms = 0x7fffffff;
else
ms = ns/1000;
ret = runtime·plan9_tsemacquire(&m->waitsemacount, 1, ms);
if(ret == 1)
return 0; // success
return -1; // timeout or interrupted
*/
}
while(runtime·plan9_semacquire(&m->waitsemacount, 1) < 0) {
/* interrupted; try again */
}
return 0; // success
}
void
......
......@@ -15,7 +15,6 @@ static void unwindstack(G*, byte*);
static void schedule(G*);
static void acquireproc(void);
static void releaseproc(void);
static M *startm(void);
typedef struct Sched Sched;
......@@ -701,7 +700,7 @@ runtime·starttheworld(bool extra)
// but m is not running a specific goroutine,
// so set the helpgc flag as a signal to m's
// first schedule(nil) to mcpu-- and grunning--.
m = startm();
m = runtime·newm();
m->helpgc = 1;
runtime·sched.grunning++;
}
......@@ -756,14 +755,14 @@ matchmg(void)
// Find the m that will run gp.
if((mp = mget(gp)) == nil)
mp = startm();
mp = runtime·newm();
mnextg(mp, gp);
}
}
// Create a new m. It will start off with a call to runtime·mstart.
static M*
startm(void)
M*
runtime·newm(void)
{
M *m;
......
......@@ -70,6 +70,8 @@ typedef struct Hchan Hchan;
typedef struct Complex64 Complex64;
typedef struct Complex128 Complex128;
typedef struct WinCall WinCall;
typedef struct Timers Timers;
typedef struct Timer Timer;
/*
* per-cpu declaration.
......@@ -315,6 +317,33 @@ enum {
};
#endif
struct Timers
{
Lock;
G *timerproc;
bool sleeping;
bool rescheduling;
Note waitnote;
Timer **t;
int32 len;
int32 cap;
};
// Package time knows the layout of this structure.
// If this struct changes, adjust ../time/sleep.go:/runtimeTimer.
struct Timer
{
int32 i; // heap index
// Timer wakes up at when, and then at when+period, ... (period > 0 only)
// each time calling f(now, arg) in the timer goroutine, so f must be
// a well-behaved function and not block.
int64 when;
int64 period;
void (*f)(int64, Eface);
Eface arg;
};
/*
* defined macros
* you need super-gopher-guru privilege
......@@ -483,6 +512,8 @@ uint32 runtime·fastrand1(void);
void runtime·exit(int32);
void runtime·breakpoint(void);
void runtime·gosched(void);
void runtime·tsleep(int64);
M* runtime·newm(void);
void runtime·goexit(void);
void runtime·asmcgocall(void (*fn)(void*), void*);
void runtime·entersyscall(void);
......@@ -540,10 +571,28 @@ void runtime·unlock(Lock*);
* subsequent noteclear must be called only after
* previous notesleep has returned, e.g. it's disallowed
* to call noteclear straight after notewakeup.
*
* notetsleep is like notesleep but wakes up after
* a given number of nanoseconds even if the event
* has not yet happened. if a goroutine uses notetsleep to
* wake up early, it must wait to call noteclear until it
* can be sure that no other goroutine is calling
* notewakeup.
*/
void runtime·noteclear(Note*);
void runtime·notesleep(Note*);
void runtime·notewakeup(Note*);
void runtime·notetsleep(Note*, int64);
/*
* low-level synchronization for implementing the above
*/
uintptr runtime·semacreate(void);
int32 runtime·semasleep(int64);
void runtime·semawakeup(M*);
// or
void runtime·futexsleep(uint32*, uint32, int64);
void runtime·futexwakeup(uint32*, uint32);
/*
* This is consistent across Linux and BSD.
......
......@@ -2,12 +2,242 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Runtime implementations to help package time.
// Time-related runtime and pieces of package time.
package time
#include "runtime.h"
#include "defs.h"
#include "os.h"
#include "arch.h"
#include "malloc.h"
static Timers timers;
static void addtimer(Timer*);
static bool deltimer(Timer*);
// Package time APIs.
// Godoc uses the comments in package time, not these.
// Nanoseconds returns the current time in nanoseconds.
func Nanoseconds() (ret int64) {
ret = runtime·nanotime();
}
// Sleep puts the current goroutine to sleep for at least ns nanoseconds.
func Sleep(ns int64) {
g->status = Gwaiting;
g->waitreason = "sleep";
runtime·tsleep(ns);
}
// startTimer adds t to the timer heap.
func startTimer(t *Timer) {
addtimer(t);
}
// stopTimer removes t from the timer heap if it is there.
// It returns true if t was removed, false if t wasn't even there.
func stopTimer(t *Timer) (stopped bool) {
stopped = deltimer(t);
}
// C runtime.
static void timerproc(void);
static void siftup(int32);
static void siftdown(int32);
// Ready the goroutine e.data.
static void
ready(int64 now, Eface e)
{
USED(now);
runtime·ready(e.data);
}
// Put the current goroutine to sleep for ns nanoseconds.
// The caller must have set g->status and g->waitreason.
void
runtime·tsleep(int64 ns)
{
Timer t;
if(ns <= 0)
return;
t.when = runtime·nanotime() + ns;
t.period = 0;
t.f = ready;
t.arg.data = g;
addtimer(&t);
runtime·gosched();
}
// Add a timer to the heap and start or kick the timer proc
// if the new timer is earlier than any of the others.
static void
addtimer(Timer *t)
{
int32 n;
Timer **nt;
runtime·lock(&timers);
if(timers.len >= timers.cap) {
// Grow slice.
n = 16;
if(n <= timers.cap)
n = timers.cap*3 / 2;
nt = runtime·malloc(n*sizeof nt[0]);
runtime·memmove(nt, timers.t, timers.len*sizeof nt[0]);
runtime·free(timers.t);
timers.t = nt;
timers.cap = n;
}
t->i = timers.len++;
timers.t[t->i] = t;
siftup(t->i);
if(t->i == 0) {
// siftup moved to top: new earliest deadline.
if(timers.sleeping) {
timers.sleeping = false;
runtime·notewakeup(&timers.waitnote);
}
if(timers.rescheduling) {
timers.rescheduling = false;
runtime·ready(timers.timerproc);
}
}
if(timers.timerproc == nil)
timers.timerproc = runtime·newproc1((byte*)timerproc, nil, 0, 0, addtimer);
runtime·unlock(&timers);
}
// Delete timer t from the heap.
// Do not need to update the timerproc:
// if it wakes up early, no big deal.
static bool
deltimer(Timer *t)
{
int32 i;
runtime·lock(&timers);
// t may not be registered anymore and may have
// a bogus i (typically 0, if generated by Go).
// Verify it before proceeding.
i = t->i;
if(i < 0 || i >= timers.len || timers.t[i] != t) {
runtime·unlock(&timers);
return false;
}
timers.t[i] = timers.t[--timers.len];
siftup(i);
siftdown(i);
runtime·unlock(&timers);
return true;
}
// Timerproc runs the time-driven events.
// It sleeps until the next event in the timers heap.
// If addtimer inserts a new earlier event, addtimer
// wakes timerproc early.
static void
timerproc(void)
{
int64 delta, now;
Timer *t;
for(;;) {
runtime·lock(&timers);
now = runtime·nanotime();
for(;;) {
if(timers.len == 0) {
delta = -1;
break;
}
t = timers.t[0];
delta = t->when - now;
if(delta > 0)
break;
if(t->period > 0) {
// leave in heap but adjust next time to fire
t->when += t->period * (1 + -delta/t->period);
siftdown(0);
} else {
// remove from heap
timers.t[0] = timers.t[--timers.len];
timers.t[0]->i = 0;
siftdown(0);
t->i = -1; // mark as removed
}
t->f(now, t->arg);
}
if(delta < 0) {
// No timers left - put goroutine to sleep.
timers.rescheduling = true;
g->status = Gwaiting;
g->waitreason = "timer goroutine (idle)";
runtime·unlock(&timers);
runtime·gosched();
continue;
}
// At least one timer pending. Sleep until then.
timers.sleeping = true;
runtime·noteclear(&timers.waitnote);
runtime·unlock(&timers);
runtime·entersyscall();
runtime·notetsleep(&timers.waitnote, delta);
runtime·exitsyscall();
}
}
// heap maintenance algorithms.
static void
siftup(int32 i)
{
int32 p;
Timer **t, *tmp;
t = timers.t;
while(i > 0) {
p = (i-1)/2; // parent
if(t[i]->when >= t[p]->when)
break;
tmp = t[i];
t[i] = t[p];
t[p] = tmp;
t[i]->i = i;
t[p]->i = p;
i = p;
}
}
static void
siftdown(int32 i)
{
int32 c, len;
Timer **t, *tmp;
t = timers.t;
len = timers.len;
for(;;) {
c = i*2 + 1; // left child
if(c >= len) {
break;
}
if(c+1 < len && t[c+1]->when < t[c]->when)
c++;
if(t[c]->when >= t[i]->when)
break;
tmp = t[i];
t[i] = t[c];
t[c] = tmp;
t[i]->i = i;
t[c]->i = c;
i = c;
}
}
......@@ -150,10 +150,25 @@ runtime·usleep(uint32 us)
runtime·stdcall(runtime·Sleep, 1, (uintptr)us);
}
void
runtime·semasleep(void)
#define INFINITE ((uintptr)0xFFFFFFFF)
int32
runtime·semasleep(int64 ns)
{
runtime·stdcall(runtime·WaitForSingleObject, 2, m->waitsema, (uintptr)-1);
uintptr ms;
if(ns < 0)
ms = INFINITE;
else if(ns/1000000 > 0x7fffffffLL)
ms = 0x7fffffff;
else {
ms = ns/1000000;
if(ms == 0)
ms = 1;
}
if(runtime·stdcall(runtime·WaitForSingleObject, 2, m->waitsema, ms) != 0)
return -1; // timeout
return 0;
}
void
......
......@@ -4,54 +4,60 @@
package time
import (
"container/heap"
"sync"
)
// Interface to timers implemented in package runtime.
// Must be in sync with ../runtime/runtime.h:/^struct.Timer$
type runtimeTimer struct {
i int32
when int64
period int64
f func(int64, interface{})
arg interface{}
}
func startTimer(*runtimeTimer)
func stopTimer(*runtimeTimer) bool
// The Timer type represents a single event.
// When the Timer expires, the current time will be sent on C
// unless the Timer represents an AfterFunc event.
// When the Timer expires, the current time will be sent on C,
// unless the Timer was created by AfterFunc.
type Timer struct {
C <-chan int64
t int64 // The absolute time that the event should fire.
f func(int64) // The function to call when the event fires.
i int // The event's index inside eventHeap.
r runtimeTimer
}
type timerHeap []*Timer
// forever is the absolute time (in ns) of an event that is forever away.
const forever = 1 << 62
// maxSleepTime is the maximum length of time that a sleeper
// sleeps for before checking if it is defunct.
const maxSleepTime = 1e9
var (
// timerMutex guards the variables inside this var group.
timerMutex sync.Mutex
// timers holds a binary heap of pending events, terminated with a sentinel.
timers timerHeap
// currentSleeper is an ever-incrementing counter which represents
// the current sleeper. It allows older sleepers to detect that they are
// defunct and exit.
currentSleeper int64
)
func init() {
timers.Push(&Timer{t: forever}) // sentinel
// Stop prevents the Timer from firing.
// It returns true if the call stops the timer, false if the timer has already
// expired or stopped.
func (t *Timer) Stop() (ok bool) {
return stopTimer(&t.r)
}
// NewTimer creates a new Timer that will send
// the current time on its channel after at least ns nanoseconds.
func NewTimer(ns int64) *Timer {
c := make(chan int64, 1)
e := after(ns, func(t int64) { c <- t })
e.C = c
return e
t := &Timer{
C: c,
r: runtimeTimer{
when: Nanoseconds() + ns,
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
func sendTime(now int64, c interface{}) {
// Non-blocking send of time on c.
// Used in NewTimer, it cannot block anyway (buffer).
// Used in NewTicker, dropping sends on the floor is
// the desired behavior when the reader gets behind,
// because the sends are periodic.
select {
case c.(chan int64) <- now:
default:
}
}
// After waits at least ns nanoseconds before sending the current time
......@@ -65,113 +71,17 @@ func After(ns int64) <-chan int64 {
// in its own goroutine. It returns a Timer that can
// be used to cancel the call using its Stop method.
func AfterFunc(ns int64, f func()) *Timer {
return after(ns, func(_ int64) {
go f()
})
}
// Stop prevents the Timer from firing.
// It returns true if the call stops the timer, false if the timer has already
// expired or stopped.
func (e *Timer) Stop() (ok bool) {
timerMutex.Lock()
// Avoid removing the first event in the queue so that
// we don't start a new sleeper unnecessarily.
if e.i > 0 {
heap.Remove(timers, e.i)
t := &Timer{
r: runtimeTimer{
when: Nanoseconds() + ns,
f: goFunc,
arg: f,
},
}
ok = e.f != nil
e.f = nil
timerMutex.Unlock()
return
}
// after is the implementation of After and AfterFunc.
// When the current time is after ns, it calls f with the current time.
// It assumes that f will not block.
func after(ns int64, f func(int64)) (e *Timer) {
now := Nanoseconds()
t := now + ns
if ns > 0 && t < now {
panic("time: time overflow")
}
timerMutex.Lock()
t0 := timers[0].t
e = &Timer{nil, t, f, -1}
heap.Push(timers, e)
// Start a new sleeper if the new event is before
// the first event in the queue. If the length of time
// until the new event is at least maxSleepTime,
// then we're guaranteed that the sleeper will wake up
// in time to service it, so no new sleeper is needed.
if t0 > t && (t0 == forever || ns < maxSleepTime) {
currentSleeper++
go sleeper(currentSleeper)
}
timerMutex.Unlock()
return
}
// sleeper continually looks at the earliest event in the queue, waits until it happens,
// then removes any events in the queue that are due. It stops when the queue
// is empty or when another sleeper has been started.
func sleeper(sleeperId int64) {
timerMutex.Lock()
e := timers[0]
t := Nanoseconds()
for e.t != forever {
if dt := e.t - t; dt > 0 {
if dt > maxSleepTime {
dt = maxSleepTime
}
timerMutex.Unlock()
sysSleep(dt)
timerMutex.Lock()
if currentSleeper != sleeperId {
// Another sleeper has been started, making this one redundant.
break
}
}
e = timers[0]
t = Nanoseconds()
for t >= e.t {
if e.f != nil {
e.f(t)
e.f = nil
}
heap.Pop(timers)
e = timers[0]
}
}
timerMutex.Unlock()
}
func (timerHeap) Len() int {
return len(timers)
}
func (timerHeap) Less(i, j int) bool {
return timers[i].t < timers[j].t
}
func (timerHeap) Swap(i, j int) {
timers[i], timers[j] = timers[j], timers[i]
timers[i].i = i
timers[j].i = j
}
func (timerHeap) Push(x interface{}) {
e := x.(*Timer)
e.i = len(timers)
timers = append(timers, e)
startTimer(&t.r)
return t
}
func (timerHeap) Pop() interface{} {
// TODO: possibly shrink array.
n := len(timers) - 1
e := timers[n]
timers[n] = nil
timers = timers[0:n]
e.i = -1
return e
func goFunc(now int64, arg interface{}) {
go arg.(func())()
}
......@@ -17,25 +17,4 @@ func Seconds() int64 {
func Nanoseconds() int64
// Sleep pauses the current goroutine for at least ns nanoseconds.
// Higher resolution sleeping may be provided by syscall.Nanosleep
// on some operating systems.
func Sleep(ns int64) error {
_, err := sleep(Nanoseconds(), ns)
return err
}
// sleep takes the current time and a duration,
// pauses for at least ns nanoseconds, and
// returns the current time and an error.
func sleep(t, ns int64) (int64, error) {
// TODO(cw): use monotonic-time once it's available
end := t + ns
for t < end {
err := sysSleep(end - t)
if err != nil {
return 0, err
}
t = Nanoseconds()
}
return t, nil
}
func Sleep(ns int64)
......@@ -4,156 +4,15 @@
package time
import (
"errors"
"sync"
)
import "errors"
// A Ticker holds a synchronous channel that delivers `ticks' of a clock
// at intervals.
type Ticker struct {
C <-chan int64 // The channel on which the ticks are delivered.
c chan<- int64 // The same channel, but the end we use.
ns int64
shutdown chan bool // Buffered channel used to signal shutdown.
nextTick int64
next *Ticker
r runtimeTimer
}
// Stop turns off a ticker. After Stop, no more ticks will be sent.
func (t *Ticker) Stop() {
select {
case t.shutdown <- true:
// ok
default:
// Stop in progress already
}
}
// Tick is a convenience wrapper for NewTicker providing access to the ticking
// channel only. Useful for clients that have no need to shut down the ticker.
func Tick(ns int64) <-chan int64 {
if ns <= 0 {
return nil
}
return NewTicker(ns).C
}
type alarmer struct {
wakeUp chan bool // wakeup signals sent/received here
wakeMeAt chan int64
wakeTime int64
}
// Set alarm to go off at time ns, if not already set earlier.
func (a *alarmer) set(ns int64) {
switch {
case a.wakeTime > ns:
// Next tick we expect is too late; shut down the late runner
// and (after fallthrough) start a new wakeLoop.
close(a.wakeMeAt)
fallthrough
case a.wakeMeAt == nil:
// There's no wakeLoop, start one.
a.wakeMeAt = make(chan int64)
a.wakeUp = make(chan bool, 1)
go wakeLoop(a.wakeMeAt, a.wakeUp)
fallthrough
case a.wakeTime == 0:
// Nobody else is waiting; it's just us.
a.wakeTime = ns
a.wakeMeAt <- ns
default:
// There's already someone scheduled.
}
}
// Channel to notify tickerLoop of new Tickers being created.
var newTicker chan *Ticker
func startTickerLoop() {
newTicker = make(chan *Ticker)
go tickerLoop()
}
// wakeLoop delivers ticks at scheduled times, sleeping until the right moment.
// If another, earlier Ticker is created while it sleeps, tickerLoop() will start a new
// wakeLoop and signal that this one is done by closing the wakeMeAt channel.
func wakeLoop(wakeMeAt chan int64, wakeUp chan bool) {
for wakeAt := range wakeMeAt {
Sleep(wakeAt - Nanoseconds())
wakeUp <- true
}
}
// A single tickerLoop serves all ticks to Tickers. It waits for two events:
// either the creation of a new Ticker or a tick from the alarm,
// signaling a time to wake up one or more Tickers.
func tickerLoop() {
// Represents the next alarm to be delivered.
var alarm alarmer
var now, wakeTime int64
var tickers *Ticker
for {
select {
case t := <-newTicker:
// Add Ticker to list
t.next = tickers
tickers = t
// Arrange for a new alarm if this one precedes the existing one.
alarm.set(t.nextTick)
case <-alarm.wakeUp:
now = Nanoseconds()
wakeTime = now + 1e15 // very long in the future
var prev *Ticker = nil
// Scan list of tickers, delivering updates to those
// that need it and determining the next wake time.
// TODO(r): list should be sorted in time order.
for t := tickers; t != nil; t = t.next {
select {
case <-t.shutdown:
// Ticker is done; remove it from list.
if prev == nil {
tickers = t.next
} else {
prev.next = t.next
}
continue
default:
}
if t.nextTick <= now {
if len(t.c) == 0 {
// Only send if there's room. We must not block.
// The channel is allocated with a one-element
// buffer, which is sufficient: if he hasn't picked
// up the last tick, no point in sending more.
t.c <- now
}
t.nextTick += t.ns
if t.nextTick <= now {
// Still behind; advance in one big step.
t.nextTick += (now - t.nextTick + t.ns) / t.ns * t.ns
}
}
if t.nextTick < wakeTime {
wakeTime = t.nextTick
}
prev = t
}
if tickers != nil {
// Please send wakeup at earliest required time.
// If there are no tickers, don't bother.
alarm.wakeTime = wakeTime
alarm.wakeMeAt <- wakeTime
} else {
alarm.wakeTime = 0
}
}
}
}
var onceStartTickerLoop sync.Once
// NewTicker returns a new Ticker containing a channel that will
// send the time, in nanoseconds, every ns nanoseconds. It adjusts the
// intervals to make up for pauses in delivery of the ticks. The value of
......@@ -162,16 +21,33 @@ func NewTicker(ns int64) *Ticker {
if ns <= 0 {
panic(errors.New("non-positive interval for NewTicker"))
}
c := make(chan int64, 1) // See comment on send in tickerLoop
// Give the channel a 1-element time buffer.
// If the client falls behind while reading, we drop ticks
// on the floor until the client catches up.
c := make(chan int64, 1)
t := &Ticker{
C: c,
c: c,
ns: ns,
shutdown: make(chan bool, 1),
nextTick: Nanoseconds() + ns,
}
onceStartTickerLoop.Do(startTickerLoop)
// must be run in background so global Tickers can be created
go func() { newTicker <- t }()
r: runtimeTimer{
when: Nanoseconds() + ns,
period: ns,
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
// Stop turns off a ticker. After Stop, no more ticks will be sent.
func (t *Ticker) Stop() {
stopTimer(&t.r)
}
// Tick is a convenience wrapper for NewTicker providing access to the ticking
// channel only. Useful for clients that have no need to shut down the ticker.
func Tick(ns int64) <-chan int64 {
if ns <= 0 {
return nil
}
return NewTicker(ns).C
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment