Commit b2450310 authored by Kirill Smelkov's avatar Kirill Smelkov

context: Add support for deadlines

Implement deadlines / timeouts using timers added recently in 9c260fde
(time: New package that mirrors Go's time).
parent 27f91b78
...@@ -179,8 +179,8 @@ Concurrency ...@@ -179,8 +179,8 @@ Concurrency
In addition to `go` and channels, the following packages are provided to help In addition to `go` and channels, the following packages are provided to help
handle concurrency in structured ways: handle concurrency in structured ways:
- `golang.context` provides contexts to propagate cancellation and task-scoped - `golang.context` provides contexts to propagate deadlines, cancellation and
values among spawned goroutines. task-scoped values among spawned goroutines.
- `golang.sync` provides `sync.WorkGroup` to spawn group of goroutines working - `golang.sync` provides `sync.WorkGroup` to spawn group of goroutines working
on a common task. It also provides low-level primitives - for example on a common task. It also provides low-level primitives - for example
......
...@@ -28,13 +28,18 @@ See the following links about Go contexts: ...@@ -28,13 +28,18 @@ See the following links about Go contexts:
from __future__ import print_function, absolute_import from __future__ import print_function, absolute_import
from golang import go, chan, select, default, nilchan from golang import go, chan, select, default, nilchan
from golang import time
import threading import threading
# Context is the interface that every context must implement. # Context is the interface that every context must implement.
# #
# A context carries cancellation signal and immutable context-local # A context carries deadline, cancellation signal and immutable context-local
# key -> value dict. # key -> value dict.
class Context(object): class Context(object):
# deadline() returns context deadline or None, if there is no deadline.
def deadline(ctx): # -> time | None
raise NotImplementedError()
# done returns channel that is closed when the context is canceled. # done returns channel that is closed when the context is canceled.
def done(ctx): # -> chan def done(ctx): # -> chan
raise NotImplementedError() raise NotImplementedError()
...@@ -47,9 +52,6 @@ class Context(object): ...@@ -47,9 +52,6 @@ class Context(object):
def value(ctx, key): # -> value | None def value(ctx, key): # -> value | None
raise NotImplementedError() raise NotImplementedError()
# TODO:
# .deadline()
# background returns empty context that is never canceled. # background returns empty context that is never canceled.
def background(): # -> Context def background(): # -> Context
...@@ -80,6 +82,36 @@ def with_cancel(parent): # -> ctx, cancel ...@@ -80,6 +82,36 @@ def with_cancel(parent): # -> ctx, cancel
def with_value(parent, key, value): # -> ctx def with_value(parent, key, value): # -> ctx
return _ValueCtx({key: value}, parent) return _ValueCtx({key: value}, parent)
# with_deadline creates new context with deadline.
#
# The deadline of created context is the earliest of provided deadline or
# deadline of parent. Created context will be canceled when time goes past
# context deadline or cancel called, whichever happens first.
#
# The caller should explicitly call cancel to release context resources as soon
# the context is no longer needed.
def with_deadline(parent, deadline): # -> ctx, cancel
# parent's deadline is before deadline -> just use parent
pdead = parent.deadline()
if pdead is not None and pdead <= deadline:
return with_cancel(parent)
# timeout <= 0 -> already canceled
timeout = deadline - time.now()
if timeout <= 0:
ctx, cancel = with_cancel(parent)
cancel()
return ctx, cancel
ctx = _TimeoutCtx(timeout, deadline, parent)
return ctx, lambda: ctx._cancel(canceled)
# with_timeout creates new context with timeout.
#
# it is shorthand for with_deadline(parent, now+timeout).
def with_timeout(parent, timeout): # -> ctx, cancel
return with_deadline(parent, time.now() + timeout)
# merge merges 2 contexts into 1. # merge merges 2 contexts into 1.
# #
# The result context: # The result context:
...@@ -110,6 +142,9 @@ class _Background(object): ...@@ -110,6 +142,9 @@ class _Background(object):
def value(bg, key): def value(bg, key):
return None return None
def deadline(bg):
return None
_background = _Background() _background = _Background()
# _BaseCtx is the common base for Contexts implemented in this package. # _BaseCtx is the common base for Contexts implemented in this package.
...@@ -149,6 +184,16 @@ class _BaseCtx(object): ...@@ -149,6 +184,16 @@ class _BaseCtx(object):
return v return v
return None return None
# deadline returns the earliest deadline of parents.
# this behaviour is inherited by all contexts except _TimeoutCtx who overrides it.
def deadline(ctx):
d = None
for parent in ctx._parentv:
pd = parent.deadline()
if d is None or (pd is not None and pd < d):
d = pd
return d
# _cancel cancels ctx and its children. # _cancel cancels ctx and its children.
def _cancel(ctx, err): def _cancel(ctx, err):
return ctx._cancelFrom(None, err) return ctx._cancelFrom(None, err)
...@@ -242,6 +287,24 @@ class _ValueCtx(_BaseCtx): ...@@ -242,6 +287,24 @@ class _ValueCtx(_BaseCtx):
return super(_ValueCtx, ctx).value(key) return super(_ValueCtx, ctx).value(key)
# _TimeoutCtx is context that is canceled on timeout.
class _TimeoutCtx(_CancelCtx):
def __init__(ctx, timeout, deadline, parent):
super(_TimeoutCtx, ctx).__init__(parent)
assert timeout > 0
ctx._deadline = deadline
ctx._timer = time.after_func(timeout, lambda: ctx._cancel(deadlineExceeded))
def deadline(ctx):
return ctx._deadline
# cancel -> stop timer
def _cancelFrom(ctx, cancelFrom, err):
super(_TimeoutCtx, ctx)._cancelFrom(cancelFrom, err)
ctx._timer.stop()
# _ready returns whether channel ch is ready. # _ready returns whether channel ch is ready.
def _ready(ch): def _ready(ch):
_, _rx = select( _, _rx = select(
......
...@@ -20,12 +20,14 @@ ...@@ -20,12 +20,14 @@
from __future__ import print_function, absolute_import from __future__ import print_function, absolute_import
from golang import context, nilchan from golang import context, time, nilchan
from golang.context import _ready as ready from golang.context import _ready as ready
from golang.time_test import dt
# assertCtx asserts on state of _BaseCtx* # assertCtx asserts on state of _BaseCtx*
def assertCtx(ctx, children, err=None, done=False): def assertCtx(ctx, children, deadline=None, err=None, done=False):
assert isinstance(ctx, context._BaseCtx) assert isinstance(ctx, context._BaseCtx)
assert ctx.deadline() == deadline
assert ctx.err() is err assert ctx.err() is err
assert ready(ctx.done()) == done assert ready(ctx.done()) == done
assert ctx._children == children assert ctx._children == children
...@@ -37,9 +39,12 @@ Y = True ...@@ -37,9 +39,12 @@ Y = True
bg = context.background() bg = context.background()
# test_context exercises with_cancel / with_value and merge.
# deadlines are tested in test_deadline.
def test_context(): def test_context():
assert bg.err() is None assert bg.err() is None
assert bg.done() is nilchan assert bg.done() is nilchan
assert bg.deadline() is None
assert not ready(bg.done()) assert not ready(bg.done())
assert bg.value("hello") is None assert bg.value("hello") is None
...@@ -143,3 +148,83 @@ def test_context(): ...@@ -143,3 +148,83 @@ def test_context():
assertCtx(ctx121, Z, err=C, done=Y) assertCtx(ctx121, Z, err=C, done=Y)
assertCtx(ctx1211, Z, err=C, done=Y) assertCtx(ctx1211, Z, err=C, done=Y)
assertCtx(ctxM, Z, err=C, done=Y) assertCtx(ctxM, Z, err=C, done=Y)
# test_deadline exercises deadline-related context functionality.
def test_deadline():
t0 = time.now()
d1 = t0 + 10*dt
d2 = t0 + 20*dt
d3 = t0 + 30*dt
ctx1, cancel1 = context.with_deadline(bg, d2)
assert ctx1.done() is not bg.done()
assertCtx(ctx1, Z, deadline=d2)
ctx11 = context.with_value(ctx1, "a", "b")
assert ctx11.done() is ctx1.done()
assert ctx11.value("a") == "b"
assertCtx(ctx1, {ctx11}, deadline=d2)
assertCtx(ctx11, Z, deadline=d2)
ctx111, cancel111 = context.with_cancel(ctx11)
assert ctx111.done() is not ctx11.done
assertCtx(ctx1, {ctx11}, deadline=d2)
assertCtx(ctx11, {ctx111}, deadline=d2)
assertCtx(ctx111, Z, deadline=d2)
ctx1111, cancel1111 = context.with_deadline(ctx111, d3) # NOTE deadline > parent
assert ctx1111.done() is not ctx111.done()
assertCtx(ctx1, {ctx11}, deadline=d2)
assertCtx(ctx11, {ctx111}, deadline=d2)
assertCtx(ctx111, {ctx1111}, deadline=d2)
assertCtx(ctx1111, Z, deadline=d2) # NOTE not d3
ctx12, cancel12 = context.with_deadline(ctx1, d1)
assert ctx12.done() is not ctx1.done()
assertCtx(ctx1, {ctx11, ctx12}, deadline=d2)
assertCtx(ctx11, {ctx111}, deadline=d2)
assertCtx(ctx111, {ctx1111}, deadline=d2)
assertCtx(ctx1111, Z, deadline=d2)
assertCtx(ctx12, Z, deadline=d1)
ctxM, cancelM = context.merge(ctx1111, ctx12)
assert ctxM.done() is not ctx1111.done()
assert ctxM.done() is not ctx12.done()
assert ctxM.value("a") == "b"
assertCtx(ctx1, {ctx11, ctx12}, deadline=d2)
assertCtx(ctx11, {ctx111}, deadline=d2)
assertCtx(ctx111, {ctx1111}, deadline=d2)
assertCtx(ctx1111, {ctxM}, deadline=d2)
assertCtx(ctx12, {ctxM}, deadline=d1)
assertCtx(ctxM, Z, deadline=d1)
time.sleep(11*dt)
assertCtx(ctx1, {ctx11}, deadline=d2)
assertCtx(ctx11, {ctx111}, deadline=d2)
assertCtx(ctx111, {ctx1111}, deadline=d2)
assertCtx(ctx1111, Z, deadline=d2)
assertCtx(ctx12, Z, deadline=d1, err=D, done=Y)
assertCtx(ctxM, Z, deadline=d1, err=D, done=Y)
# explicit cancel first -> err=canceled instead of deadlineExceeded
for i in range(2):
cancel1()
assertCtx(ctx1, Z, deadline=d2, err=C, done=Y)
assertCtx(ctx11, Z, deadline=d2, err=C, done=Y)
assertCtx(ctx111, Z, deadline=d2, err=C, done=Y)
assertCtx(ctx1111, Z, deadline=d2, err=C, done=Y)
assertCtx(ctx12, Z, deadline=d1, err=D, done=Y)
assertCtx(ctxM, Z, deadline=d1, err=D, done=Y)
# with_timeout
ctx, cancel = context.with_timeout(bg, 10*dt)
assert ctx.done() is not bg.done()
d = ctx.deadline()
assert abs(d - (time.now() + 10*dt)) < 1*dt
assertCtx(ctx, Z, deadline=d)
time.sleep(11*dt)
assertCtx(ctx, Z, deadline=d, err=D, done=Y)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment