From f3f3ec899a0399b60bd9203803094cddd203acdb Mon Sep 17 00:00:00 2001
From: Kirill Smelkov <kirr@nexedi.com>
Date: Mon, 12 Dec 2022 14:24:14 +0300
Subject: [PATCH] kpi += Calc

kpi.Calc is calculator to compute KPIs. It can be instantiated on
MeasurementLog and time interval over which to perform computations.
It currently implements calculations for only one "E-RAB Accessibility KPI".

Please see added docstrings and tests for details.

The next patch will also add demo program that uses all kpi.Calc and
other parts of KPI-computation pipeline to build and visualize E-RAB
Accessibility from real data.
---
 kpi.py      | 296 +++++++++++++++++++++++++++++++++++++++++++++++++++-
 kpi_test.py | 277 +++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 571 insertions(+), 2 deletions(-)

diff --git a/kpi.py b/kpi.py
index b00fa52..21a8c94 100644
--- a/kpi.py
+++ b/kpi.py
@@ -17,7 +17,11 @@
 #
 # See COPYING file for full licensing terms.
 # See https://www.nexedi.com/licensing for rationale and options.
-"""Package kpi will provide functionality to compute Key Performance Indicators of LTE services.
+"""Package kpi provides functionality to compute Key Performance Indicators of LTE services.
+
+- Calc is KPI calculator. It can be instantiated on MeasurementLog and time
+  interval over which to perform computations. Use Calc methods such as
+  .erab_accessibility() to compute KPIs.
 
 - MeasurementLog maintains journal with result of measurements. Use .append()
   to populate it with data.
@@ -25,18 +29,52 @@
 - Measurement represents measurement results. Its documentation establishes
   semantic for measurement results to be followed by drivers.
 
+To actually compute a KPI for particular LTE service, a measurements driver
+should exist for that LTE service(*). KPI computation pipeline is then as follows:
+
+     ─────────────
+    │ Measurement │  Measurements   ────────────────       ──────
+    │             │ ─────────────→ │ MeasurementLog │ ──→ │ Calc │ ──→  KPI
+    │   driver    │                 ────────────────       ──────
+     ─────────────
+
 
 See following 3GPP standards for KPI-related topics:
 
     - TS 32.401
     - TS 32.450
     - TS 32.425
+
+(*) for example package amari.kpi provides such measurements driver for Amarisoft LTE stack.
 """
 
 import numpy as np
 from golang import func
 
 
+# Calc provides way to compute KPIs over given measurement data and time interval.
+#
+# It is constructed from MeasurementLog and [Ï„_lo, Ï„_hi) and further provides
+# following methods for computing 3GPP KPIs:
+#
+#   .erab_accessibility()    -  TS 32.450 6.1.1 "E-RAB Accessibility"
+#   TODO other KPIs
+#
+# Upon construction specified time interval is potentially widened to cover
+# corresponding data in full granularity periods:
+#
+#                  Ï„'lo                  Ï„'hi
+#       ──────|─────|────[────|────)──────|──────|────────>
+#                    ←─ τ_lo      τ_hi ──→          time
+#
+#
+# See also: MeasurementLog, Measurement.
+class Calc:
+    # ._data            []Measurement - fully inside [.Ï„_lo, .Ï„_hi)
+    # [.Ï„_lo, .Ï„_hi)    time interval to compute over. Potentially wider than originally requested.
+    pass
+
+
 # MeasurementLog represent journal of performed Measurements.
 #
 # It semantically consists of
@@ -162,6 +200,18 @@ class Measurement(np.void):
     ])
 
 
+# Interval is NumPy structured scalar that represents [lo,hi) interval.
+#
+# It is used by Calc to represent confidence interval for computed KPIs.
+# NOTE Interval is likely to be transient solution and in the future its usage
+#      will be probably changed to something like uncertainties.ufloat .
+class Interval(np.void):
+    _dtype = np.dtype([
+        ('lo',  np.float64),
+        ('hi',  np.float64),
+    ])
+
+
 # ----------------------------------------
 # Measurement is the central part around which everything is organized.
 # Let's have it go first.
@@ -328,6 +378,250 @@ def forget_past(mlog, Tcut):
 # ----------------------------------------
 
 
+# Calc() is initialized from slice of data in the measurement log that is
+# covered/overlapped with [Ï„_lo, Ï„_hi) time interval.
+#
+# The time interval, that will actually be used for computations, is potentially wider.
+# See Calc class documentation for details.
+@func(Calc)
+def __init__(calc, mlog: MeasurementLog, Ï„_lo, Ï„_hi):
+    assert Ï„_lo <= Ï„_hi
+    data = mlog.data()
+    l = len(data)
+
+    # find min i: τ_lo < [i].(Tstart+δT)    ; i=l if not found
+    # TODO binary search
+    i = 0
+    while i < l:
+        m = data[i]
+        m_τhi = m['X.Tstart'] + m['X.δT']
+        if Ï„_lo < m_Ï„hi:
+            break
+        i += 1
+
+    # find min j: τ_hi ≤ [j].Tstart         ; j=l if not found
+    j = i
+    while j < l:
+        m = data[j]
+        m_Ï„lo = m['X.Tstart']
+        if Ï„_hi <= m_Ï„lo:
+            break
+        j += 1
+
+    data = data[i:j]
+    if len(data) > 0:
+        m_lo = data[0]
+        m_hi = data[-1]
+        Ï„_lo = min(Ï„_lo, m_lo['X.Tstart'])
+        τ_hi = max(τ_hi, m_hi['X.Tstart']+m_hi['X.δT'])
+
+    calc._data = data
+    calc.Ï„_lo  = Ï„_lo
+    calc.Ï„_hi  = Ï„_hi
+
+
+# erab_accessibility computes "E-RAB Accessibility" KPI.
+#
+# It returns the following items:
+#
+#   - InitialEPSBEstabSR        probability of successful initial    E-RAB establishment    (%)
+#   - AddedEPSBEstabSR          probability of successful additional E-RAB establishment    (%)
+#
+# The items are returned as Intervals with information about confidence for
+# computed values.
+#
+# 3GPP reference: TS 32.450 6.1.1 "E-RAB Accessibility".
+@func(Calc)
+def erab_accessibility(calc): # -> InitialEPSBEstabSR, AddedEPSBEstabSR
+    SR = calc._success_rate
+
+    x = SR("Σcause RRC.ConnEstabSucc.CAUSE",
+           "Σcause RRC.ConnEstabAtt.CAUSE")
+
+    y = SR("S1SIG.ConnEstabSucc",
+           "S1SIG.ConnEstabAtt")
+
+    z = SR("Σqci ERAB.EstabInitSuccNbr.QCI",
+           "Σqci ERAB.EstabInitAttNbr.QCI")
+
+    InititialEPSBEstabSR = Interval(x['lo'] * y['lo'] * z['lo'],    # x·y·z
+                                    x['hi'] * y['hi'] * z['hi'])
+
+    AddedEPSBEstabSR = SR("Σqci ERAB.EstabAddSuccNbr.QCI",
+                          "Σqci ERAB.EstabAddAttNbr.QCI")
+
+    return _i2pc(InititialEPSBEstabSR), \
+           _i2pc(AddedEPSBEstabSR)          # as %
+
+
+# _success_rate computes success rate for fini/init events.
+#
+# i.e. ratio N(fini)/N(init).
+#
+# 3GPP defines success rate as N(successful-events) / N(total_events) ratio,
+# for example N(connection_established) / N(connection_attempt). We take this
+# definition as is for granularity periods with data, and extend it to also
+# account for time intervals covered by Calc where measurements results are not
+# available.
+#
+# To do so we extrapolate N(init) to be also contributed by "no data" periods
+# proportionally to "no data" time coverage, and then we note that in those
+# times, since no measurements have been made, the number of success events is
+# unknown and can lie anywhere in between 0 and the number of added init events.
+#
+# This gives the following for resulting success rate confidence interval:
+#
+# time covered by periods with data:                    Σt
+# time covered by periods with no data:                 t⁺      t⁺
+# extrapolation for incoming initiation events:         init⁺ = ──·Σ(init)
+#                                                               Σt
+# fini events for "no data" time is full uncertainty:   fini⁺ ∈ [0,init⁺]
+#
+# => success rate over whole time is uncertain in between
+#
+#           Σ(fini)              Σ(fini) + init⁺
+#       ──────────────   ≤ SR ≤  ──────────────
+#       Σ(init) + init⁺          Σ(init) + init⁺
+#
+# that confidence interval is returned as the result.
+#
+# fini/init events can be prefixed with "Σqci " or "Σcause ". If such prefix is
+# present, then fini/init value is obtained via call to Σqci or Σcause correspondingly.
+@func(Calc)
+def _success_rate(calc, fini, init): # -> Interval in [0,1]
+    def vget(m, name):
+        if name.startswith("Σqci "):
+            return Σqci  (m, name[len("Σqci "):])
+        if name.startswith("Σcause "):
+            return Σcause(m, name[len("Σcause "):])
+        return m[name]
+
+    t_     = 0.
+    Σt     = 0.
+    Σinit  = 0
+    Σfini  = 0
+    Σufini = 0  # Σinit where fini=ø but init is not ø
+    for m in calc._miter():
+        τ = m['X.δT']
+        vinit = vget(m, init)
+        vfini = vget(m, fini)
+        if isNA(vinit):
+            t_ += Ï„
+            # ignore fini, even if it is not ø.
+            # TODO more correct approach: init⁺ for this period ∈ [fini,∞] and
+            # once we extrapolate init⁺ we should check if it lies in that
+            # interval and adjust if not. Then fini could be used as is.
+        else:
+            Σt += τ
+            Σinit += vinit
+            if isNA(vfini):
+                Σufini += vinit
+            else:
+                Σfini += vfini
+
+    if Σinit == 0 or Σt == 0:
+        return Interval(0,1)    # full uncertainty
+
+    init_ = t_ * Σinit / Σt
+    a =  Σfini                   / (Σinit + init_)
+    b = (Σfini + init_ + Σufini) / (Σinit + init_)
+    return Interval(a,b)
+
+
+# _miter iterates through [.Ï„_lo, .Ï„_hi) yielding Measurements.
+#
+# The measurements are yielded with consecutive timestamps. There is no gaps
+# as NA Measurements are yielded for time holes in original MeasurementLog data.
+@func(Calc)
+def _miter(calc): # -> iter(Measurement)
+    Ï„ = calc.Ï„_lo
+    l = len(calc._data)
+    i = 0  # current Measurement from data
+
+    while i < l:
+        m = calc._data[i]
+        m_Ï„lo = m['X.Tstart']
+        m_τhi = m_τlo + m['X.δT']
+        assert m_Ï„lo < m_Ï„hi
+
+        if Ï„ < m_Ï„lo:
+            # <- M(ø)[τ, m_τlo)
+            h = Measurement()
+            h['X.Tstart'] = Ï„
+            h['X.δT']     = m_τlo - τ
+            yield h
+
+        # <- M from mlog
+        yield m
+
+        Ï„ = m_Ï„hi
+        i += 1
+
+    assert Ï„ <= calc.Ï„_hi
+    if Ï„ < calc.Ï„_hi:
+        # <- trailing M(ø)[τ, τ_hi)
+        h = Measurement()
+        h['X.Tstart'] = Ï„
+        h['X.δT']     = calc.τ_hi - τ
+        yield h
+
+
+# Interval(lo,hi) creates new interval with specified boundaries.
+@func(Interval)
+def __new__(cls, lo, hi):
+    i = _newscalar(cls, cls._dtype)
+    i['lo'] = lo
+    i['hi'] = hi
+    return i
+
+
+# Σqci performs summation over all qci for m[name_qci].
+#
+# usage example:
+#
+#   Σqci(m, 'ERAB.EstabInitSuccNbr.QCI')
+#
+# name_qci must have '.QCI' suffix.
+def Σqci(m: Measurement, name_qci: str):
+    return _Σx(m, name_qci, _all_qci)
+
+# Σcause, performs summation over all causes for m[name_cause].
+#
+# usage example:
+#
+#   Σcause(m, 'RRC.ConnEstabSucc.CAUSE')
+#
+# name_cause must have '.CAUSE' suffix.
+def Σcause(m: Measurement, name_cause: str):
+    return _Σx(m, name_cause, _all_cause)
+
+# _Σx serves Σqci and Σcause.
+def _Σx(m: Measurement, name_x: str, _all_x: func):
+    name_sum, name_xv = _all_x(name_x)
+    s = m[name_sum]
+    if not isNA(s):
+        return s
+    s  = s.dtype.type(0)
+    ok = True
+    for _ in name_xv:
+        v = m[_]
+        # we don't know the answer even if single value is NA
+        # (if data source does not support particular qci/cause, it should set it to 0)
+        if isNA(v):
+            ok = False
+        else:
+            s += v
+    if not ok:
+        return NA(s.dtype)
+    else:
+        return s
+
+
+# _i2pc maps Interval in [0,1] to one in [0,100] by multiplying lo/hi by 1e2.
+def _i2pc(x: Interval): # -> Interval
+    return Interval(x['lo']*100, x['hi']*100)
+
+
 # _newscalar creates new NumPy scalar instance with specified type and dtype.
 def _newscalar(typ, dtype):
     _ = np.zeros(shape=(), dtype=(typ, dtype))
diff --git a/kpi_test.py b/kpi_test.py
index 7582d16..21814ad 100644
--- a/kpi_test.py
+++ b/kpi_test.py
@@ -18,7 +18,7 @@
 # See COPYING file for full licensing terms.
 # See https://www.nexedi.com/licensing for rationale and options.
 
-from xlte.kpi import MeasurementLog, Measurement, NA, isNA
+from xlte.kpi import Calc, MeasurementLog, Measurement, Interval, NA, isNA
 import numpy as np
 from pytest import raises
 
@@ -145,6 +145,281 @@ def test_MeasurementLog():
     assert _.shape == (0,)
 
 
+# verify (Ï„_lo, Ï„_hi) widening and overlapping with Measurements on Calc initialization.
+def test_Calc_init():
+    mlog = MeasurementLog()
+
+    # _ asserts that Calc(mlog, Ï„_lo,Ï„_hi) has .Ï„_lo/.Ï„_hi as specified by
+    # Ï„_wlo/Ï„_whi, and ._data as specified by mokv.
+    def _(Ï„_lo, Ï„_hi, Ï„_wlo, Ï„_whi, *mokv):
+        c = Calc(mlog, Ï„_lo,Ï„_hi)
+        assert (c.Ï„_lo, c.Ï„_hi) == (Ï„_wlo, Ï„_whi)
+        mv = list(c._data[i] for i in range(len(c._data)))
+        assert mv == list(mokv)
+
+    # mlog(ø)
+    _( 0, 0,     0,0)
+    _( 0,99,     0,99)
+    _(10,20,    10,20)
+
+    # m1[10,20)
+    m1 = Measurement()
+    m1['X.Tstart'] = 10
+    m1['X.δT']     = 10
+    mlog.append(m1)
+
+    _( 0, 0,     0, 0)
+    _( 0,99,     0,99,  m1)
+    _(10,20,    10,20,  m1)
+    _(12,18,    10,20,  m1)
+    _( 5, 7,     5, 7)
+    _( 5,15,     5,20,  m1)
+    _(15,25,    10,25,  m1)
+    _(25,30,    25,30)
+
+    # m1[10,20) m2[30,40)
+    m2 = Measurement()
+    m2['X.Tstart'] = 30
+    m2['X.δT']     = 10
+    mlog.append(m2)
+
+    _( 0, 0,     0, 0)
+    _( 0,99,     0,99,  m1, m2)
+    _(10,20,    10,20,  m1)
+    _(12,18,    10,20,  m1)
+    _( 5, 7,     5, 7)
+    _( 5,15,     5,20,  m1)
+    _(15,25,    10,25,  m1)
+    _(25,30,    25,30)
+    _(25,35,    25,40,      m2)
+    _(35,45,    30,45,      m2)
+    _(45,47,    45,47)
+    _(32,38,    30,40,      m2)
+    _(30,40,    30,40,      m2)
+    _(99,99,    99,99)
+
+# verify Calc internal iteration over measurements and holes.
+def test_Calc_miter():
+    mlog = MeasurementLog()
+
+    # _ asserts that Calc(mlog, Ï„_lo,Ï„_hi)._miter yields Measurement as specified by mokv.
+    def _(Ï„_lo, Ï„_hi, *mokv):
+        c = Calc(mlog, Ï„_lo,Ï„_hi)
+        mv = list(c._miter())
+        assert mv == list(mokv)
+
+    # na returns Measurement with specified Ï„_lo/Ï„_hi and NA for all other data.
+    def na(Ï„_lo, Ï„_hi):
+        assert Ï„_lo <= Ï„_hi
+        m = Measurement()
+        m['X.Tstart']  = Ï„_lo
+        m['X.δT']      = τ_hi - τ_lo
+        return m
+
+    # mlog(ø)
+    _( 0, 0)
+    _( 0,99,    na(0,99))
+    _(10,20,    na(10,20))
+
+    # m1[10,20)
+    m1 = Measurement()
+    m1['X.Tstart'] = 10
+    m1['X.δT']     = 10
+    mlog.append(m1)
+
+    _( 0, 0)
+    _( 0,99,    na(0,10),  m1,  na(20,99))
+    _(10,20,               m1)
+    _( 7,20,    na(7,10),  m1)
+    _(10,23,               m1,  na(20,23))
+
+    # m1[10,20) m2[30,40)
+    m2 = Measurement()
+    m2['X.Tstart'] = 30
+    m2['X.δT']     = 10
+    mlog.append(m2)
+
+    _( 0, 0)
+    _( 0,99,    na(0,10),  m1,  na(20,30),  m2, na(40,99))
+    _(10,20,               m1)
+    _(10,30,               m1,  na(20,30))
+    _(10,40,               m1,  na(20,30),  m2)
+
+
+# verify Calc internal function that computes success rate of fini/init events.
+def test_Calc_success_rate():
+    mlog = MeasurementLog()
+
+    init = "S1SIG.ConnEstabAtt"
+    fini = "S1SIG.ConnEstabSucc"
+
+    # M returns Measurement with specified time coverage and init/fini values.
+    def M(Ï„_lo,Ï„_hi, vinit=None, vfini=None):
+        m = Measurement()
+        m['X.Tstart']  = Ï„_lo
+        m['X.δT']      = τ_hi - τ_lo
+        if vinit is not None:
+            m[init]    = vinit
+        if vfini is not None:
+            m[fini]    = vfini
+        return m
+
+    # Mlog reinitializes mlog according to specified Measurements in mv.
+    def Mlog(*mv):
+        nonlocal mlog
+        mlog = MeasurementLog()
+        for m in mv:
+            mlog.append(m)
+
+    # _ asserts that Calc(mlog, Ï„_lo,Ï„_hi)._success_rate(fini, init) returns Interval(sok_lo, sok_hi).
+    def _(Ï„_lo, Ï„_hi, sok_lo, sok_hi):
+        sok = Interval(sok_lo, sok_hi)
+        c = Calc(mlog, Ï„_lo, Ï„_hi)
+        s = c._success_rate(fini, init)
+        assert type(s) is Interval
+        eps = np.finfo(s['lo'].dtype).eps
+        assert abs(s['lo']-sok['lo'])  < eps
+        assert abs(s['hi']-sok['hi'])  < eps
+
+    # ø -> full uncertainty
+    Mlog()
+    _( 0, 0,     0,1)
+    _( 0,99,     0,1)
+    _(10,20,     0,1)
+
+    # m[10,20,  {ø,0}/{ø,0})    -> full uncertainty
+    for i in (None,0):
+        for f in (None,0):
+            Mlog(M(10,20,  i,f))
+            _( 0, 0,     0,1)
+            _( 0,99,     0,1)
+            _(10,20,     0,1)
+            _( 7,20,     0,1)
+            _(10,25,     0,1)
+
+    # m[10,20,  8,4)           -> 1/2 if counted in [10,20)
+    #
+    #         i₁=8
+    #         f₁=4
+    #   ────|──────|─────────────|──────────
+    #      10  t₁ 20 ←── t₂ ──→ τ_hi
+    #
+    # t with data:                      t₁
+    # t with no data:                   tâ‚‚
+    # t total:                          T = t₁+t₂
+    # extrapolation for incoming                tâ‚‚
+    # events for "no data" period:      i₂ = i₁·──
+    #                                           t₁
+    # termination events for "no data"
+    # period is full uncertainty        f₂ ∈ [0,i₂]
+    #
+    # => success rate over whole time is uncertain in between
+    #
+    #    f₁           f₁+i₂
+    #  ─────  ≤ SR ≤  ─────
+    #  i₁+i₂          i₁+i₂
+    #
+    Mlog(M(10,20, 8,4))
+    _( 0, 0,   0,                    1)                  # no overlap - full uncertainty
+    _(10,20,   0.5,                  0.5)                # tâ‚‚=0  - no uncertainty
+    _( 7,20,   0.3846153846153846,   0.6153846153846154) # tâ‚‚=3
+    _(10,25,   0.3333333333333333,   0.6666666666666666) # tâ‚‚=5
+    _( 0,99,   0.050505050505050504, 0.9494949494949495) # tâ‚‚=10+79
+
+    # m[10,20,  8,4)  m[30,40, 50,50]
+    #
+    # similar to the above case but with t₁ and t₂ coming with data, while t₃
+    # represents whole "no data" time:
+    #
+    #         i₁=8          i₂=50
+    #         f₁=4          f₂=50
+    #   ────|──────|──────|───────|──────────────────|──────────
+    #      10  t₁ 20  ↑  30  t₂  40       ↑         τ_hi
+    #                 │                   │
+    #                 │                   │
+    #                 `────────────────── t₃
+    #
+    # t with data:                      t₁+t₂
+    # t with no data:                   t₃
+    # t total:                          T = t₁+t₂+t₃
+    # extrapolation for incoming                      t₃
+    # events for "no data" period:      i₃ = (i₁+i₂)·────
+    #                                                t₁+t₂
+    # termination events for "no data"
+    # period is full uncertainty        f₃ ∈ [0,i₃]
+    #
+    # => success rate over whole time is uncertain in between
+    #
+    #    f₁+f₂           f₁+f₂+i₃
+    #  ────────  ≤ SR ≤  ───────
+    #  i₁+i₂+i₃          i₁+i₂+i₃
+    #
+    Mlog(M(10,20, 8,4), M(30,40, 50,50))
+    _( 0, 0,   0,                    1)                  # no overlap - full uncertainty
+    _(10,20,   0.5,                  0.5)                # exact 1/2 in [10,20)
+    _(30,40,   1,                    1)                  # exact  1  in [30,40)
+    _( 7,20,   0.3846153846153846,   0.6153846153846154) # overlaps only with t₁ -> as ^^^
+    _(10,25,   0.3333333333333333,   0.6666666666666666) # overlaps only with t₁ -> as ^^^
+    _(10,40,   0.6206896551724138,   0.9540229885057471) # t₃=10
+    _( 7,40,   0.5642633228840125,   0.9582027168234065) # t₃=13
+    _( 7,45,   0.4900181488203267,   0.9637023593466425) # t₃=18
+    _( 0,99,   0.18808777429467083,  0.9860675722744688) # t₃=79
+
+
+    # Σqci
+    init = "Σqci ERAB.EstabInitAttNbr.QCI"
+    fini = "Σqci ERAB.EstabInitSuccNbr.QCI"
+    m = M(10,20)
+    m['ERAB.EstabInitAttNbr.sum']  = 10
+    m['ERAB.EstabInitSuccNbr.sum'] = 2
+    Mlog(m)
+    _(10,20,    1/5, 1/5)
+
+    # Σcause
+    init = "Σcause RRC.ConnEstabAtt.CAUSE"
+    fini = "Σcause RRC.ConnEstabSucc.CAUSE"
+    m = M(10,20)
+    m['RRC.ConnEstabSucc.sum'] = 5
+    m['RRC.ConnEstabAtt.sum']  = 10
+    Mlog(m)
+    _(10,20,    1/2, 1/2)
+
+
+# verify Calc.erab_accessibility .
+def test_Calc_erab_accessibility():
+    # most of the job is done by _success_rate.
+    # here we verify final wrapping, that erab_accessibility does, only lightly.
+    m = Measurement()
+    m['X.Tstart'] = 10
+    m['X.δT']     = 10
+
+    m['RRC.ConnEstabSucc.sum']      = 2
+    m['RRC.ConnEstabAtt.sum']       = 7
+
+    m['S1SIG.ConnEstabSucc']        = 3
+    m['S1SIG.ConnEstabAtt']         = 8
+
+    m['ERAB.EstabInitSuccNbr.sum']  = 4
+    m['ERAB.EstabInitAttNbr.sum']   = 9
+
+    m['ERAB.EstabAddSuccNbr.sum']   = 5
+    m['ERAB.EstabAddAttNbr.sum']    = 10
+
+    mlog = MeasurementLog()
+    mlog.append(m)
+
+    calc = Calc(mlog, 10,20)
+
+    # _ asserts that provided interval is precise and equals vok.
+    def _(i: Interval, vok):
+        assert i['lo'] == i['hi']
+        assert i['lo'] == vok
+
+    InititialEPSBEstabSR, AddedEPSBEstabSR = calc.erab_accessibility()
+    _(AddedEPSBEstabSR,     50)
+    _(InititialEPSBEstabSR, 100 * 2*3*4 / (7*8*9))
+
+
 def test_NA():
     def _(typ):
         return NA(typ(0).dtype)
-- 
2.30.9