Commit 044deb35 authored by Kirill Smelkov's avatar Kirill Smelkov

time: Redo timers properly

Background: in 2019 in 9c260fde (time: New package that mirrors Go's
time) and b073f6df (time: Move/Port timers to C++/Pyx nogil) I've added
basic timers - with proper API but with very dumb implementation that
was spawning one thread per each timer. There were just a few timers in
the users and this was working, surprisingly, relatively ok...

... until 2023 where I was working on XLTE that needs to organize 100Hz
polling of Amarisoft eNodeB service to retrieve information about flows
on Data Radio Bearers:

    xlte@2a016d48
    https://lab.nexedi.com/kirr/xlte/-/blob/8e606c64/amari/drb.py

There each request comes with its own deadline - to catch "no reply",
and the deadlines are implemented via timers. So there are 100 threads
created every second which adds visible overhead, consumes a lot of
virtual address space and RSS for threads stacks, and should be all unnecessary.

We was tolerating even that for some time, but recently Joanne approached me
with reports that xamari program, that does the polling, is leaking memory.

With that, and because it was hard to find what is actually leaking,
I've started to remove uncertainties and there are a lot of uncertainty
in what is going on when lots of threads are being created over and over.

In the end the leak turned out to be likely a different thing (see
nexedi/pygolang!24, still
discovered while working on hereby patch), but all of the above was
enough motivation to finally start redoing the timers properly.

--------

So when it comes to do the timers properly more or less, there is
usually queue of armed timers, and a loop that picks entries from that
queue to fire them. I was initially trying to do the simple thing and
use std::priority_queue for that, because priority_queue is internally
heap, and heaps can provide O(log(n)) insertion and removal of arbitrary
element, plus O(1) "pick top element to process". Exactly what would
suit. However I quickly found that even in 2024, std::priority_queue
does not provide removal operation at all, and there is no such thing as
e.g. std::sift_heap, that would help to implement that manually. Which
is surprising, because e.g. libevent implements all that just ok via
sifting up/down upon removal in logarithmic complexity:

https://github.com/libevent/libevent/blob/80e25c02/minheap-internal.h#L96-L115

the lack of efficient removal operation turned out to be a blocker to
use std::priority_queue because most of the timers, that are armed for
timeouts, are never expired and upon successful completion of covered
operation, the timer is stopped. In other words the timer is removed
from the timer queue and the removal is one of the most often
operations.

So, if std::priority_queue cannot work, we would need to either bring in
another implementation of a heap, or, if we are to bring something,
bring and use something else that is more suitable for implementing
timers.

That reminded me that in 2005 for my Navy project, I already implemented
custom timer wheel to handle timeouts after reading https://lwn.net/Articles/152436/ .
Contrary to heaps, such timer wheels provide O(1) insertion and removal
of timers and work generally faster. But this time I did not want to
delve into implementing all that myself again and tried to look around
of what is available out there.

There was an update to kernel timer-wheel implementation described at
https://lwn.net/Articles/646950/ and from that a project called
Timeout.c was also found that provides implementation for such a wheel
for user space: https://25thandclement.com/~william/projects/timeout.c.html .

However when we are to pick third-party code, we should be ready to
understand it and fix bugs there on our own. So the audit of timeout.c
did not went very smoothly - there are many platform-depended places,
and the issue tracker shows signs that sometimes not everything is ok
with the implementation. With that I've looked around a bit more and
found more compact and more portable Ratas library with good structure
and description and whose audit came more well:

    https://www.snellman.net/blog/archive/2016-07-27-ratas-hierarchical-timer-wheel
    https://github.com/jsnell/ratas

Here, after going through the code, I feel to be capable to understand
issues and fix bugs myself if that would become needed.

And the benchmark comparison of Timeout.c and Ratas shows that they
should be of the same order regarding performance:

https://lab.nexedi.com/kirr/misc/-/blob/4f51fd6/bench/time-wheel/ratas-vs-timeout.pdf
ratas@382321d2
timeout@d6f15744

which makes Ratas the winner for me.

Having timer-wheel implementation, the rest is just technique to glue it
all together. One implementation aspect deserves to be mentioned though:

The timer loop uses Semaphore.acquire, recently modernized to also
accept timeout, to organize sleep in between pauses with also being able
to be simultaneously woken up if new timer is armed with earlier
expiration time.

Other than that the changes are mostly straightforward. Please see the
patch itself for details.

Regarding how the new implementation is more efficient for what we had
before, there are added benchmarks to measure arming timers that do not
fire, and, for symmetry, arming timers that do fire. We are most
interested in the first benchmark, because it shows how cheap or
expensive it is to use timers to implement timeouts, but the second one
is also useful to have to see the overhead of the whole timers machinery.

On my machine under py3.11 they go as after this patch:

    name              time/op
    timer_arm_cancel   805ns ± 0%
    timer_arm_fire    9.63µs ± 0%

and before the patch the benchmarks simply do not run till the end
because they run out of memory due to huge number of threads being
created.

Still with the following test program we can measure the effect new
timers implementation has:

    ---- 8< ----
    from golang import time

    def main():
        δt_rate = 1*time.millisecond

        tprev = time.now()
        tnext = tprev + δt_rate
        while 1:
            timer = time.Timer(5*time.second)
            _ = timer.stop()
            assert _ is True

            t = time.now()
            δtsleep = tnext - t
            #print('sleep %.3f ms' % (δtsleep/time.millisecond))
            time.sleep(δtsleep)
            tprev = tnext
            tnext += δt_rate

    main()
    ---- 8< ----

This program creates/arms and cancels a timer 1000 times per second.

Before hereby patch this program consumes ~ 30% of CPU, while after
hereby patch this program consumes ~ 7-8% of CPU.

For the reference just a sleep part of that program, with all code
related to timers removed consumes ~5% of CPU, while the consumption of
plain sleep(1ms) in C and directly using system calls

    ---- 8< ----
    #include <unistd.h>

    int main() {
        while (1) {
            usleep(1000);
        }
        return 0;
    }
    ---- 8< ----

is ~ 3-4% of CPU on my machine.

/cc @jerome
/cc ORS team (@jhuge, @lu.xu, @tomo, @xavier_thompson, @Daetalus)
/proposed-for-review-on nexedi/pygolang!26
parent 82c55254
[submodule "3rdparty/ratas"]
path = 3rdparty/ratas
url = https://github.com/jsnell/ratas.git
Subproject commit becd5fc5c1e9ea600cd8b3b1c24d564794fedac4
......@@ -131,6 +131,7 @@ using internal::_runtime;
namespace internal { namespace atomic { extern void _init(); } }
namespace os { namespace signal { extern void _init(); } }
namespace time { extern void _init(); }
void _libgolang_init(const _libgolang_runtime_ops *runtime_ops) {
if (_runtime != nil) // XXX better check atomically
panic("libgolang: double init");
......@@ -138,6 +139,7 @@ void _libgolang_init(const _libgolang_runtime_ops *runtime_ops) {
internal::atomic::_init();
os::signal::_init();
time::_init();
}
void _taskgo(void (*f)(void *), void *arg) {
......
......@@ -21,8 +21,24 @@
// See time.h for package overview.
#include "golang/time.h"
#include "timer-wheel.h"
#include <math.h>
#define DEBUG 0
#if DEBUG
# define debugf(format, ...) fprintf(stderr, format, ##__VA_ARGS__)
#else
# define debugf(format, ...) do {} while (0)
#endif
// golang::sync:: (private imports)
namespace golang {
namespace sync {
bool _semaacquire_timed(_sema *sema, uint64_t timeout_ns);
}} // golang::sync::
// golang::time:: (except sleep and now)
......@@ -30,7 +46,6 @@ namespace golang {
namespace time {
// ---- timers ----
// FIXME timers are implemented very inefficiently - each timer currently consumes a goroutine.
Ticker new_ticker(double dt);
Timer new_timer (double dt);
......@@ -56,7 +71,7 @@ Timer new_timer(double dt) {
}
// Ticker
// Ticker (small wrapper around Timer)
_Ticker::_Ticker() {}
_Ticker::~_Ticker() {}
void _Ticker::decref() {
......@@ -72,9 +87,7 @@ Ticker new_ticker(double dt) {
tx->c = makechan<double>(1); // 1-buffer -- same as in Go
tx->_dt = dt;
tx->_stop = false;
go([tx]() {
tx->_tick();
});
tx->_timer = after_func(dt, [tx]() { tx ->_tick(); });
return tx;
}
......@@ -83,6 +96,10 @@ void _Ticker::stop() {
tx._mu.lock();
tx._stop = true;
if (tx._timer != nil) {
tx._timer->stop();
tx._timer = nil; // break Ticker -> Timer -> _tick -> Ticker cycle
}
// drain what _tick could have been queued already
while (tx.c.len() > 0)
......@@ -93,16 +110,15 @@ void _Ticker::stop() {
void _Ticker::_tick() {
_Ticker &tx = *this;
while (1) {
// XXX adjust for accumulated error δ?
sleep(tx._dt);
tx._mu.lock();
if (tx._stop) {
tx._mu.unlock();
return;
}
// XXX adjust for accumulated error δ?
tx._timer->reset(tx._dt);
// send from under ._mu so that .stop can be sure there is no
// ongoing send while it drains the channel.
double t = now();
......@@ -111,90 +127,361 @@ void _Ticker::_tick() {
tx.c.sends(&t),
});
tx._mu.unlock();
}
}
// Timer
// Timers
//
// Timers are implemented via Timer Wheel.
// For this time arrow is divided into equal periods named ticks, and Ratas
// library[1] is used to manage timers with granularity of ticks. We employ
// ticks to avoid unnecessary overhead of managing timeout-style timers with
// nanosecond precision.
//
// Let g denote tick granularity.
//
// The timers are provided with guaranty that their expiration happens after
// requested expiration time. In other words the following invariant is always true:
//
// t(exp) ≤ t(fire)
//
// we also want that firing _ideally_ happens not much far away from requested
// expiration time, meaning that the following property is aimed for, but not guaranteed:
//
// t(fire) < t(exp) + g
//
// a tick Ti is associated with [i-1,i)·g time range. It is said that tick Ti
// "happens" at i·g point in time. Firing of timers associated with tick Ti is
// done when Ti happens - ideally at i·g time or strictly speaking ≥ that point.
//
// When timers are armed their expiration tick is set as Texp = ⌊t(exp)/g+1⌋ to
// be in time range that tick Texp covers.
//
//
// A special goroutine, _timer_loop, is dedicated to advance time of the
// timer-wheel as ticks happen, and to run expired timers. When there is
// nothing to do that goroutine pauses itself and goes to sleep until either
// next expiration moment, or until new timer with earlier expiration time is
// armed. To be able to simultaneously select on those two condition a
// semaphore with acquisition timeout is employed. Please see _tSema for
// details.
//
//
// [1] Ratas - A hierarchical timer wheel.
// https://www.snellman.net/blog/archive/2016-07-27-ratas-hierarchical-timer-wheel,
// https://github.com/jsnell/ratas
// Tns indicates time measured in nanoseconds.
// It is used for documentation purposes mainly to distinguish from the time measured in ticks.
typedef uint64_t Tns;
// _tick_g is ticks granularity in nanoseconds.
static const Tns _tick_g = 1024; // 1 tick is ~ 1 μs
// timer-wheel holds registry of all timers and manages them.
static sync::Mutex* _tWheelMu; // lock for timer wheel + sleep/wakeup channel (see _tSema & co below)
static TimerWheel* _tWheel; // for each timer the wheel holds 1 reference to _TimerImpl object
// _TimerImpl amends _Timer with timer-wheel entry and implementation-specific state.
enum _TimerState {
_TimerDisarmed, // timer is not registered to timer wheel and is not firing
_TimerArmed, // timer is registered to timer wheel and is not firing
_TimerFiring // timer is currently firing (and not on the timer wheel)
};
struct _TimerImpl : _Timer {
void _fire();
void _queue_fire();
MemberTimerEvent<_TimerImpl, &_TimerImpl::_queue_fire> _tWheelEntry;
func<void()> _f;
sync::Mutex _mu;
_TimerState _state;
// entry on "firing" list; see _tFiring for details
_TimerImpl* _tFiringNext; // TODO could reuse _tWheelEntry.{next_,prev_} for "firing" list
_TimerImpl();
~_TimerImpl();
};
_TimerImpl::_TimerImpl() : _tWheelEntry(this) {}
_TimerImpl::~_TimerImpl() {}
_Timer::_Timer() {}
_Timer::~_Timer() {}
void _Timer::decref() {
if (__decref())
delete this;
delete static_cast<_TimerImpl*>(this);
}
// _tSema and _tSleeping + _tWaking organize sleep/wakeup channel.
//
// Timer loop uses wakeup sema to both:
// * sleep until next timer expires, and
// * become woken up earlier if new timer with earlier expiration time is armed
//
// _tSleeping + _tWaking are used by the timer loop and clients to coordinate
// _tSema operations, so that the value of sema is always 0 or 1, and that
// every new loop cycle starts with sema=0, meaning that sema.Acquire will block.
//
// Besides same.Acquire, all operations on the sleep/wakeup channel are done under _tWheelMu.
static sync::_sema* _tSema;
static bool _tSleeping; // 1 iff timer loop:
// \/ decided to go to sleep on wakeup sema
// \/ sleeps on wakeup sema via Acquire
// \/ woken up after Acquire before setting _tSleeping=0 back
static bool _tWaking; // 1 iff client timer arm:
// /\ saw _tSleeping=1 && _tWaking=0 and decided to do wakeup
// /\ (did Release \/ will do Release)
// /\ until timer loop set back _tWaking=0
static Tns _tSleeping_until; // until when timer loop is sleeping if _tSleeping=1
// _timer_loop implements timer loop: it runs in dedicated goroutine ticking the
// timer-wheel and sleeping in between ticks.
static void _timer_loop();
static void _timer_loop_fire_queued();
void _init() {
_tWheelMu = new sync::Mutex();
_tWheel = new TimerWheel(_nanotime() / _tick_g);
_tSema = sync::_makesema(); sync::_semaacquire(_tSema); // 1 -> 0
_tSleeping = false;
_tWaking = false;
_tSleeping_until = 0;
go(_timer_loop);
}
static void _timer_loop() {
while (1) {
// tick the wheel. This puts expired timers on firing list but delays
// really firing them until we release _tWheelMu.
_tWheelMu->lock();
Tick now_t = _nanotime() / _tick_g;
Tick wnow_t = _tWheel->now();
Tick wdt_t = now_t - wnow_t;
debugf("LOOP: now_t: %lu wnow_t: %lu δ_t %lu ...\n", now_t, wnow_t, wdt_t);
if (now_t > wnow_t) // advance(0) panics. Avoid that if we wake up earlier
_tWheel->advance(wdt_t); // inside the same tick, e.g. due to signal.
_tWheelMu->unlock();
// fire the timers queued on the firing list
_timer_loop_fire_queued();
// go to sleep until next timer expires or wakeup comes from new arming.
//
// limit max sleeping time because contrary to other wheel operations -
// - e.g. insert and delete which are O(1), the complexity of
// ticks_to_next_event is O(time till next expiry).
Tns tsleep_max = 1*1E9; // 1s
bool sleeping = false;
_tWheelMu->lock();
Tick wsleep_t = _tWheel->ticks_to_next_event(tsleep_max / _tick_g);
Tick wnext_t = _tWheel->now() + wsleep_t;
Tns tnext = wnext_t * _tick_g;
Tns tnow = _nanotime();
if (tnext > tnow) {
_tSleeping = sleeping = true;
_tSleeping_until = tnext;
}
_tWheelMu->unlock();
if (!sleeping)
continue;
Tns tsleep = tnext - tnow;
debugf("LOOP: sleeping %.3f μs ...\n", tsleep / 1e3);
bool acq = sync::_semaacquire_timed(_tSema, tsleep);
// bring sleep/wakeup channel back into reset state with S=0
_tWheelMu->lock();
// acq ^ waking Release was done while Acquire was blocked S=0
// acq ^ !waking impossible
// !acq ^ waking Acquire finished due to timeout; Release was done after that S=1
// !acq ^ !waking Acquire finished due to timeout; no Release was done at all S=0
debugf("LOOP: woken up acq=%d waking=%d\n", acq, _tWaking);
if ( acq && !_tWaking) {
_tWheelMu->unlock();
panic("BUG: timer loop: woken up with acq ^ !waking");
}
if (!acq && _tWaking) {
acq = sync::_semaacquire_timed(_tSema, 0); // S=1 -> acquire should be immediate
if (!acq) {
_tWheelMu->unlock();
panic("BUG: timer loop: reacquire after acq ^ waking failed");
}
}
_tSleeping = false;
_tWaking = false;
_tSleeping_until = 0;
_tWheelMu->unlock();
}
}
Timer _new_timer(double dt, func<void()> f) {
Timer t = adoptref(new _Timer());
t->c = (f == nil ? makechan<double>(1) : nil);
t->_f = f;
t->_dt = INFINITY;
t->_ver = 0;
_TimerImpl* _t = new _TimerImpl();
_t->c = (f == nil ? makechan<double>(1) : nil);
_t->_f = f;
_t->_state = _TimerDisarmed;
_t->_tFiringNext = nil;
Timer t = adoptref(static_cast<_Timer*>(_t));
t->reset(dt);
return t;
}
void _Timer::reset(double dt) {
_Timer &t = *this;
_TimerImpl& t = *static_cast<_TimerImpl*>(this);
if (dt <= 0)
dt = 0;
Tns when = _nanotime() + Tns(dt*1e9);
Tick when_t = when / _tick_g + 1; // Ti covers [i-1,i)·g
_tWheelMu->lock();
t._mu.lock();
if (t._dt != INFINITY) {
if (t._state != _TimerDisarmed) {
t._mu.unlock();
_tWheelMu->unlock();
panic("Timer.reset: the timer is armed; must be stopped or expired");
}
t._dt = dt;
t._ver += 1;
// TODO rework timers so that new timer does not spawn new goroutine.
Timer tref = newref(&t); // pass t reference to spawned goroutine
go([tref, dt](int ver) {
tref->_fire(dt, ver);
}, t._ver);
t._state = _TimerArmed;
Tick wnow_t = _tWheel->now();
Tick wdt_t;
if (when_t > wnow_t)
wdt_t = when_t - wnow_t;
else
wdt_t = 1; // schedule(0) panics
// the wheel will keep a reference to the timer
t.incref();
_tWheel->schedule(&t._tWheelEntry, wdt_t);
t._mu.unlock();
// wakeup timer loop if it is sleeping until later than new timer expiry
if (_tSleeping) {
if ((when < _tSleeping_until) && !_tWaking) {
debugf("USER: waking up loop\n");
_tWaking = true;
sync::_semarelease(_tSema);
}
}
_tWheelMu->unlock();
}
bool _Timer::stop() {
_Timer &t = *this;
_TimerImpl& t = *static_cast<_TimerImpl*>(this);
bool canceled;
_tWheelMu->lock();
t._mu.lock();
if (t._dt == INFINITY) {
switch (t._state) {
case _TimerDisarmed:
canceled = false;
}
else {
t._dt = INFINITY;
t._ver += 1;
break;
case _TimerArmed:
// timer wheel is holding this timer entry. Remove it from there.
t._tWheelEntry.cancel();
t.decref();
canceled = true;
break;
case _TimerFiring:
// the timer is on "firing" list. Timer loop will process it and skip
// upon seeing ._state = _TimerDisarmed. It will also be the timer loop
// to drop the reference to the timer that timer-wheel was holding.
canceled = true;
break;
default:
panic("invalid timer state");
}
if (canceled)
t._state = _TimerDisarmed;
// drain what _fire could have been queued already
while (t.c.len() > 0)
t.c.recv();
t._mu.unlock();
_tWheelMu->unlock();
return canceled;
}
void _Timer::_fire(double dt, int ver) {
_Timer &t = *this;
// when timers are fired by _tWheel.advance(), they are first popped from _tWheel and put on
// _tFiring list, so that the real firing could be done without holding _tWheelMu.
static _TimerImpl* _tFiring = nil;
static _TimerImpl* _tFiringLast = nil;
void _TimerImpl::_queue_fire() {
_TimerImpl& t = *this;
sleep(dt);
t._mu.lock();
if (t._ver != ver) {
assert(t._state == _TimerArmed);
t._state = _TimerFiring;
t._mu.unlock();
return; // the timer was stopped/resetted - don't fire it
t._tFiringNext = nil;
if (_tFiring == nil)
_tFiring = &t;
if (_tFiringLast != nil)
_tFiringLast->_tFiringNext = &t;
_tFiringLast = &t;
}
static void _timer_loop_fire_queued() {
for (_TimerImpl* t = _tFiring; t != nil;) {
_TimerImpl* fnext = t->_tFiringNext;
t->_tFiringNext = nil;
t->_fire();
t->decref(); // wheel was holding a reference to the timer
t = fnext;
}
t._dt = INFINITY;
_tFiring = nil;
_tFiringLast = nil;
}
void _TimerImpl::_fire() {
_TimerImpl& t = *this;
bool fire = false;
t._mu.lock();
if (t._state == _TimerFiring) { // stop could disarm the timer in the meantime
t._state = _TimerDisarmed;
fire = true;
debugf("LOOP: firing @ %lu ...\n", t._tWheelEntry.scheduled_at());
// send under ._mu so that .stop can be sure that if it sees
// ._dt = INFINITY, there is no ongoing .c send.
if (t._f == nil) {
// ._state = _TimerDisarmed, there is no ongoing .c send.
if (t._f == nil)
t.c.send(now());
t._mu.unlock();
return;
}
t._mu.unlock();
// call ._f not from under ._mu not to deadlock e.g. if ._f wants to reset the timer.
if (fire && t._f != nil)
t._f();
}
......
#ifndef _NXD_LIBGOLANG_TIME_H
#define _NXD_LIBGOLANG_TIME_H
// Copyright (C) 2019-2023 Nexedi SA and Contributors.
// Copyright (C) 2019-2024 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -118,6 +118,7 @@ private:
double _dt;
sync::Mutex _mu;
bool _stop;
Timer _timer;
// don't new - create only via new_ticker()
private:
......@@ -147,18 +148,12 @@ LIBGOLANG_API Timer new_timer(double dt);
struct _Timer : object {
chan<double> c;
private:
func<void()> _f;
sync::Mutex _mu;
double _dt; // +inf - stopped, otherwise - armed
int _ver; // current timer was armed by n'th reset
// don't new - create only via new_timer() & co
private:
_Timer();
~_Timer();
friend Timer _new_timer(double dt, func<void()> f);
friend class _TimerImpl;
public:
LIBGOLANG_API void decref();
......@@ -182,9 +177,6 @@ public:
//
// the timer must be either already stopped or expired.
LIBGOLANG_API void reset(double dt);
private:
void _fire(double dt, int ver);
};
......
......@@ -23,6 +23,7 @@ from __future__ import print_function, absolute_import
from golang import select, func, defer
from golang import time, sync
from golang.golang_test import panics
from six.moves import range as xrange
# all timer tests operate in dt units
dt = 10*time.millisecond
......@@ -222,3 +223,22 @@ def test_timer_reset_armed():
t = time.Timer(10*dt); defer(t.stop)
with panics("Timer.reset: the timer is armed; must be stopped or expired"):
t.reset(5*dt)
# bench_timer_arm_cancel benchmarks arming timers that do not fire.
# it shows how cheap or expensive it is to use timers to implement timeouts.
def bench_timer_arm_cancel(b):
for i in xrange(b.N):
t = time.Timer(10*time.second)
_ = t.stop()
assert _ is True
# bench_timer_arm_fire benchmarks arming timers that do fire.
# it shows what it costs to go through all steps related to timer loop and firing timers.
def bench_timer_arm_fire(b):
wg = sync.WaitGroup()
wg.add(b.N)
for i in xrange(b.N):
t = time.after_func(1*time.millisecond, wg.done)
wg.wait()
......@@ -229,8 +229,11 @@ setup(
'golang/os/signal.h',
'golang/strings.h',
'golang/sync.h',
'golang/time.h'],
include_dirs = ['3rdparty/include'],
'golang/time.h',
'3rdparty/ratas/src/timer-wheel.h'],
include_dirs = [
'3rdparty/include',
'3rdparty/ratas/src'],
define_macros = [('BUILDING_LIBGOLANG', None)],
soversion = '0.1'),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment