Commit 9c260fde authored by Kirill Smelkov's avatar Kirill Smelkov

time: New package that mirrors Go's time

Add time.Timer, time.Ticker and convenience functions time.tick,
time.after and time.after_func. These will be used in context to support
deadlines and timeouts.

While at time topic, also provide sleep and now from golang.time, so
that there is no need to import both golang.time and stdlib's time in a
file.

Provide time constants in the module as they are useful to have and
mirror constants provided by Go's time.

Note: timers implementation is very suboptimal for now.
parent 6e3b3ff4
...@@ -186,6 +186,8 @@ handle concurrency in structured ways: ...@@ -186,6 +186,8 @@ handle concurrency in structured ways:
on a common task. It also provides low-level primitives - for example on a common task. It also provides low-level primitives - for example
`sync.Once` and `sync.WaitGroup` - that are sometimes useful too. `sync.Once` and `sync.WaitGroup` - that are sometimes useful too.
- `golang.time` provides timers integrated with channels.
See `Go Concurrency Patterns: Context`__ for overview of contexts. See `Go Concurrency Patterns: Context`__ for overview of contexts.
__ https://blog.golang.org/context __ https://blog.golang.org/context
......
# -*- coding: utf-8 -*-
# Copyright (C) 2019 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Package time mirrors Go package time.
See the following link about Go time package:
https://golang.org/pkg/time
"""
from __future__ import print_function, absolute_import
import time as _time
from golang import go, chan, select, default, nilchan, panic
import threading
# golang/py - the same as std python - represents time as float
second = 1.0
nanosecond = 1E-9 * second
microsecond = 1E-6 * second
millisecond = 1E-3 * second
minute = 60 * second
hour = 60 * minute
sleep = _time.sleep
now = _time.time
# ---- timers ----
# FIXME timers are implemented very inefficiently - each timer currently consumes a goroutine.
# tick returns channel connected to dt ticker.
#
# Note: there is no way to stop created ticker.
# Note: for dt <= 0, contrary to Ticker, tick returns nilchan instead of panicking.
def tick(dt): # -> chan time
if dt <= 0:
return nilchan
return Ticker(dt).c
# after returns channel connected to dt timer.
#
# Note: with after there is no way to stop/garbage-collect created timer until it fires.
def after(dt): # -> chan time
return Timer(dt).c
# after_func arranges to call f after dt time.
#
# The function will be called in its own goroutine.
# Returned timer can be used to cancel the call.
def after_func(dt, f): # -> Timer
return Timer(dt, f=f)
# Ticker arranges for time events to be sent to .c channel on dt-interval basis.
#
# If the receiver is slow, Ticker does not queue events and skips them.
# Ticking can be canceled via .stop() .
class Ticker(object):
def __init__(self, dt):
if dt <= 0:
panic("ticker: dt <= 0")
self.c = chan(1) # 1-buffer -- same as in Go
self._dt = dt
self._mu = threading.Lock()
self._stop = False
go(self._tick)
# stop cancels the ticker.
#
# It is guaranteed that ticker channel is empty after stop completes.
def stop(self):
with self._mu:
self._stop = True
# drain what _tick could have been queued already
while len(self.c) > 0:
self.c.recv()
def _tick(self):
while 1:
# XXX adjust for accumulated error δ?
sleep(self._dt)
with self._mu:
if self._stop:
return
# send from under ._mu so that .stop can be sure there is no
# ongoing send while it drains the channel.
select(
default,
(self.c.send, now()),
)
# Timer arranges for time event to be sent to .c channel after dt time.
#
# The timer can be stopped (.stop), or reinitialized to another time (.reset).
#
# If func f is provided - when the timer fires f is called in its own goroutine
# instead of event being sent to channel .c .
class Timer(object):
def __init__(self, dt, f=None):
self._f = f
self.c = chan(1) if f is None else nilchan
self._mu = threading.Lock()
self._dt = None # None - stopped, float - armed
self._ver = 0 # current timer was armed by n'th reset
self.reset(dt)
# stop cancels the timer.
#
# It returns:
#
# False: the timer was already expired or stopped,
# True: the timer was armed and canceled by this stop call.
#
# Note: contrary to Go version, there is no need to drain timer channel
# after stop call - it is guaranteed that after stop the channel is empty.
#
# Note: similarly to Go, if Timer is used with function - it is not
# guaranteed that after stop the function is not running - in such case
# the caller must explicitly synchronize with that function to complete.
def stop(self): # -> canceled
with self._mu:
if self._dt is None:
canceled = False
else:
self._dt = None
self._ver += 1
canceled = True
# drain what _fire could have been queued already
while len(self.c) > 0:
self.c.recv()
return canceled
# reset rearms the timer.
#
# the timer must be either already stopped or expired.
def reset(self, dt):
with self._mu:
if self._dt is not None:
panic("Timer.reset: the timer is armed; must be stopped or expired")
self._dt = dt
self._ver += 1
go(self._fire, dt, self._ver)
def _fire(self, dt, ver):
sleep(dt)
with self._mu:
if self._ver != ver:
return # the timer was stopped/resetted - don't fire it
self._dt = None
# send under ._mu so that .stop can be sure that if it sees
# ._dt = None, there is no ongoing .c send.
if self._f is None:
self.c.send(now())
return
# call ._f not from under ._mu not to deadlock e.g. if ._f wants to reset the timer.
self._f()
# -*- coding: utf-8 -*-
# Copyright (C) 2019 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from __future__ import print_function, absolute_import
from golang import select, _PanicError
from golang import time
from pytest import raises
# all timer tests operate in dt units
dt = 10*time.millisecond
# test_timer verifies that Timer/Ticker fire as expected.
def test_timer():
# start timers at x5, x7 and x11 intervals an verify that the timers fire
# in expected sequence. The times when the timers fire do not overlap in
# checked range because intervals are prime and chosen so that they start
# overlapping only after 35 (=5·7).
tv = [] # timer events
Tstart = time.now()
t23 = time.Timer(23*dt)
t5 = time.Timer( 5*dt)
def _():
tv.append(7)
t7f.reset(7*dt)
t7f = time.Timer( 7*dt, f=_)
tx11 = time.Ticker(11*dt)
while 1:
_, _rx = select(
t23.c.recv, # 0
t5 .c.recv, # 1
t7f.c.recv, # 2
tx11.c.recv, # 3
)
if _ == 0:
tv.append(23)
break
if _ == 1:
tv.append(5)
t5.reset(5*dt)
if _ == 2:
assert False, "t7f sent to channel; must only call func"
if _ == 3:
tv.append(11)
Tend = time.now()
assert (Tend - Tstart) >= 23*dt
assert tv == [ 5, 7, 5, 11, 7, 5, 5, 7,11,23]
# 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
# test_timer_misc, similarly to test_timer, verifies misc timer convenience functions.
def test_timer_misc():
tv = []
Tstart = time.now()
c23 = time.after(23*dt)
c5 = time.after( 5*dt)
def _():
tv.append(7)
t7f.reset(7*dt)
t7f = time.after_func(7*dt, _)
cx11 = time.tick(11*dt)
while 1:
_, _rx = select(
c23.recv, # 0
c5 .recv, # 1
t7f.c.recv, # 2
cx11.recv, # 3
)
if _ == 0:
tv.append(23)
break
if _ == 1:
tv.append(5)
# NOTE 5 does not rearm in this test because there is no way to
# rearm timer create by time.after().
if _ == 2:
assert False, "t7f sent to channel; must only call func"
if _ == 3:
tv.append(11)
Tend = time.now()
assert (Tend - Tstart) >= 23*dt
assert tv == [ 5, 7, 11, 7, 7,11,23]
# 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
# test_timer_stop verifies that .stop() cancels Timer or Ticker.
def test_timer_stop():
tv = []
t10 = time.Timer (10*dt)
t2 = time.Timer ( 2*dt) # will fire and cancel t3, tx5
t3 = time.Timer ( 3*dt) # will be canceled
tx5 = time.Ticker( 5*dt) # will be canceled
while 1:
_, _rx = select(
t10.c.recv, # 0
t2 .c.recv, # 1
t3 .c.recv, # 2
tx5.c.recv, # 3
)
if _ == 0:
tv.append(10)
break
if _ == 1:
tv.append(2)
t3.stop()
tx5.stop()
if _ == 2:
tv.append(3)
if _ == 3:
tv.append(5)
assert tv == [ 2, 10]
# 1 2 3 4 5 6 7 8 9 10
# test_timer_stop_drain verifies that Timer/Ticker .stop() drains timer channel.
def test_timer_stop_drain():
t = time.Timer (1*dt)
tx = time.Ticker(1*dt)
time.sleep(2*dt)
assert len(t.c) == 1
assert len(tx.c) == 1
assert t.stop() == False
assert len(t.c) == 0
tx.stop()
assert len(tx.c) == 0
# test_timer_reset_armed verifies that .reset() panics if called on armed timer.
def test_timer_reset_armed():
# reset while armed
t = time.Timer(10*dt)
with raises(_PanicError):
t.reset(5*dt)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment