Commit 499a7c1b authored by Kirill Smelkov's avatar Kirill Smelkov

amari.kpi: Teach LogMeasure to handle x.drb_stats messages

This patch provides next building block for E-UTRAN IP Throughput KPI
and continues

    d102ffaa (drb: Start of the package)
    5bf7dc1c (amari.{drb,xlog}: Provide aggregated DRB statistics in the form of synthetic x.drb_stats message)

Quoting those patches

    The scheme to compute E-UTRAN IP Throughput is thus as follows: poll eNB at
    100Hz frequency for `ue_get[stats]` and retrieve information about per-UE/QCI
    streams and the number of transport blocks dl/ul-ed to the UE in question
    during that 10ms frame. Estimate `tx_time` taking into account
    the number of transmitted transport blocks. And estimate whether eNB is congested or
    not based on `dl_use_avg`/`ul_use_avg` taken from `stats`. For the latter we
    also need to poll for `stats` at 100Hz frequency and synchronize
    `ue_get[stats]` and `stats` requests in time so that they both cover the same
    time interval of particular frame.

    Then organize the polling process to provide aggregated statistics in the form of
    new `x.drb_stats` message, and teach `xamari xlog` to save that messages to
    `enb.xlog` together with `stats`.

    Then further adjust `amari.kpi.LogMeasure`						<-- NOTE
    and generic `kpi.Measurement` and `kpi.Calc` to handle DRB-related data.

So here we implement the noted step:

We teach LogMeasure to take x.drb_stats messages into account and update IP
Throughput related fields in appropriate Measurement from x.drb_stats
data.

This process is relatively straightforward besides one place: for stable
output E-UTRAN IP Throughput is required to be computed without taking
into account last TTI of every sample. We don't have that level of
details since all we have is total amount of transmitted bytes in a
burst and estimation of how long in time the burst is. Thus we can only
provide an estimation for the E-UTRAN IP Throughput as follows:

    DRB.IPVol and DRB.IPTime are collected to compute throughput.

    thp = ΣB*/ΣT*  where B* is tx'ed bytes in the sample without taking last tti into account
                   and   T* is time of tx also without taking that sample's tail tti.

    we only know ΣB (whole amount of tx), ΣT and ΣT* with some error.

    -> thp can be estimated to be inside the following interval:

             ΣB            ΣB
            ───── ≤ thp ≤ ─────           (1)
            ΣT_hi         ΣT*_lo

    the upper layer in xlte.kpi will use the following formula for
    final throughput calculation:

                  DRB.IPVol
            thp = ──────────              (2)
                  DRB.IPTime

    -> set DRB.IPTime and its error to mean and δ of ΣT_hi and ΣT*_lo
    so that (2) becomes (1).

for this to work we also need to introduce new fields to Measurement
that represent error of DRB.IPTime. The hope is that introduction is
temporary and should be removed once we rework DRB stats to provide B*
and T* directly.
parent fd7870f4
...@@ -27,7 +27,10 @@ ...@@ -27,7 +27,10 @@
information is emitted in the form of synthetic x.drb_stats message whose information is emitted in the form of synthetic x.drb_stats message whose
generation is integrated into amari.xlog package. generation is integrated into amari.xlog package.
See the following related 3GPP standards references: Please see amari.kpi package that turns x.drb_stats data into
measurements related to E-UTRAN IP Throughput KPI.
See also the following related 3GPP standards references:
- TS 32.450 6.3.1 "E-UTRAN IP Throughput" - TS 32.450 6.3.1 "E-UTRAN IP Throughput"
- TS 32.425 4.4.6 "IP Throughput measurements" - TS 32.425 4.4.6 "IP Throughput measurements"
......
...@@ -51,6 +51,8 @@ class LogMeasure: ...@@ -51,6 +51,8 @@ class LogMeasure:
# \/ None # \/ None
# ._m kpi.Measurement being prepared covering [_estats_prev, _estats) | None # ._m kpi.Measurement being prepared covering [_estats_prev, _estats) | None
# ._m_next kpi.Measurement being prepared covering [_estats, _estats_next) | None # ._m_next kpi.Measurement being prepared covering [_estats, _estats_next) | None
#
# ._drb_stats last xlog.Message with x.drb_stats | None ; reset on error|event
pass pass
...@@ -66,6 +68,7 @@ def __init__(logm, rxlog, rlog): ...@@ -66,6 +68,7 @@ def __init__(logm, rxlog, rlog):
logm._estats = None logm._estats = None
logm._m = None logm._m = None
logm._m_next = None logm._m_next = None
logm._drb_stats = None
# close releases resources associated with LogMeasure and closes underlying readers. # close releases resources associated with LogMeasure and closes underlying readers.
@func(LogMeasure) @func(LogMeasure)
...@@ -109,6 +112,7 @@ def _read(logm): ...@@ -109,6 +112,7 @@ def _read(logm):
_trace('._m: \t', logm._m) _trace('._m: \t', logm._m)
_trace('._estats:\t', logm._estats) _trace('._estats:\t', logm._estats)
_trace('._m_next:\t', logm._m_next) _trace('._m_next:\t', logm._m_next)
_trace('._drb_stats:\t', logm._drb_stats)
if m is not None: if m is not None:
return m return m
...@@ -151,6 +155,9 @@ def _read(logm): ...@@ -151,6 +155,9 @@ def _read(logm):
# handle messages that update current Measurement # handle messages that update current Measurement
if isinstance(x, xlog.Message): if isinstance(x, xlog.Message):
if x.message == "x.drb_stats":
logm._handle_drb_stats(x)
continue
if x.message != "stats": if x.message != "stats":
continue # ignore other messages continue # ignore other messages
...@@ -175,6 +182,7 @@ def _read(logm): ...@@ -175,6 +182,7 @@ def _read(logm):
if isinstance(x, (xlog.Event, LogError)): if isinstance(x, (xlog.Event, LogError)):
logm._estats = x # it is ok to forget previous event after e.g. bad line with ParseError logm._estats = x # it is ok to forget previous event after e.g. bad line with ParseError
logm._drb_stats = None # reset ._drb_stats at an error or event
continue # flush the queue continue # flush the queue
assert isinstance(x, xlog.Message) assert isinstance(x, xlog.Message)
...@@ -353,6 +361,109 @@ def _stats_cc(stats: xlog.Message, counter: str): ...@@ -353,6 +361,109 @@ def _stats_cc(stats: xlog.Message, counter: str):
return cc_dict['messages'].get(counter, 0) return cc_dict['messages'].get(counter, 0)
# _handle_drb_stats handles next x.drb_stats xlog entry upon _read request.
@func(LogMeasure)
def _handle_drb_stats(logm, drb_stats: xlog.Message):
# TODO precheck for correct message structure similarly to _stats_check
drb_stats_prev = logm._drb_stats
logm._drb_stats = drb_stats
# first drb_stats after an event - we don't know which time period it covers
if drb_stats_prev is None:
return
assert isinstance(drb_stats_prev, xlog.Message)
assert drb_stats_prev.message == "x.drb_stats"
# time coverage for current drb_stats
τ_lo = drb_stats_prev.timestamp
τ_hi = drb_stats.timestamp
δτ = τ_hi - τ_lo
# see with which ._m or ._m_next, if any, drb_stats overlaps with ≥ 50% of
# time first, and update that measurement correspondingly.
if not (δτ > 0):
return
if logm._m is not None:
m_lo = logm._m['X.Tstart']
m_hi = m_lo + logm._m['X.δT']
d = max(0, min(τ_hi, m_hi) -
max(τ_lo, m_lo))
if d >= δτ/2: # NOTE ≥ 50%, not > 50% not to skip drb_stats if fill is exactly 50%
_drb_update(logm._m, drb_stats)
return
if logm._m_next is not None:
n_lo = logm._m_next['X.Tstart']
# n_hi - don't know as _m_next['X.δT'] is ø yet
d = max(0, τ_hi -
max(τ_lo, n_lo))
if d >= δτ/2:
_drb_update(logm._m_next, drb_stats)
return
# _drb_update updates Measurement from dl/ul DRB statistics related to measurement's time coverage.
def _drb_update(m: kpi.Measurement, drb_stats: xlog.Message):
# TODO Exception -> LogError("internal failure") similarly to _handle_stats
qci_trx = drb_stats.get1("qci_dict", dict)
for dir in ('dl', 'ul'):
qvol = m['DRB.IPVol%s.QCI' % dir.capitalize()]
qtime = m['DRB.IPTime%s.QCI' % dir.capitalize()]
qtime_err = m['XXX.DRB.IPTime%s_err.QCI' % dir.capitalize()]
# qci_dict carries entries only for qci's with non-zero values, but if
# we see drb_stats we know we have information for all qcis.
# -> pre-initialize to zero everything
if kpi.isNA(qvol).all(): qvol[:] = 0
if kpi.isNA(qtime).all(): qtime[:] = 0
if kpi.isNA(qtime_err).all(): qtime_err[:] = 0
for qci_str, trx in qci_trx.items():
qci = int(qci_str)
# DRB.IPVol and DRB.IPTime are collected to compute throughput.
#
# thp = ΣB*/ΣT* where B* is tx'ed bytes in the sample without taking last tti into account
# and T* is time of tx also without taking that sample's tail tti.
#
# we only know ΣB (whole amount of tx), ΣT and ΣT* with some error.
#
# -> thp can be estimated to be inside the following interval:
#
# ΣB ΣB
# ───── ≤ thp ≤ ───── (1)
# ΣT_hi ΣT*_lo
#
# the upper layer in xlte.kpi will use the following formula for
# final throughput calculation:
#
# DRB.IPVol
# thp = ────────── (2)
# DRB.IPTime
#
# -> set DRB.IPTime and its error to mean and δ of ΣT_hi and ΣT*_lo
# so that (2) becomes (1).
# FIXME we account whole PDCP instead of only IP traffic
ΣB = trx['%s_tx_bytes' % dir]
ΣT = trx['%s_tx_time' % dir]
ΣT_err = trx['%s_tx_time_err' % dir]
ΣTT = trx['%s_tx_time_notailtti' % dir]
ΣTT_err = trx['%s_tx_time_notailtti_err' % dir]
ΣT_hi = ΣT + ΣT_err
ΣTT_lo = ΣTT - ΣTT_err
qvol[qci] = 8*ΣB # in bits
qtime[qci] = (ΣT_hi + ΣTT_lo) / 2
qtime_err[qci] = (ΣT_hi - ΣTT_lo) / 2
# LogError(timestamp|None, *argv). # LogError(timestamp|None, *argv).
@func(LogError) @func(LogError)
def __init__(e, τ, *argv): def __init__(e, τ, *argv):
......
...@@ -19,9 +19,9 @@ ...@@ -19,9 +19,9 @@
# See https://www.nexedi.com/licensing for rationale and options. # See https://www.nexedi.com/licensing for rationale and options.
from xlte.amari.kpi import LogMeasure, LogError, _trace as trace from xlte.amari.kpi import LogMeasure, LogError, _trace as trace
from xlte.kpi import Measurement from xlte.kpi import Measurement, isNA
from golang import func, defer, b from golang import func, defer, b
import io, json import io, json, re
from pytest import raises from pytest import raises
...@@ -91,6 +91,24 @@ class tLogMeasure: ...@@ -91,6 +91,24 @@ class tLogMeasure:
def expect1(t, field, vok): def expect1(t, field, vok):
if t._mok is None: if t._mok is None:
t._mok_init() t._mok_init()
# if a particular X.QCI[qci] is expected - default all other qcis to 0
_ = re.match(r"^(.*)\.([0-9]+)$", field)
if _ is not None:
farr = "%s.QCI" % _.group(1)
if isNA(t._mok[farr]).all():
t._mok[farr][:] = 0
# also automatically initialize XXX.DRB.IPTimeX_err to 0.01 upon seeing DRB.IPTimeX
# ( in tests we use precise values for tx_time and tx_time_notailtti
# with δ=0.02 - see drb_trx and jdrb_stats)
n = _.group(1)
if n.startswith('DRB.IPTime'):
ferr = "XXX.%s_err" % n
if isNA(t._mok[ferr+'.QCI']).all():
t._mok[ferr+'.QCI'][:] = 0
t._mok["%s.%s" % (ferr, _.group(2))] = ((vok + 0.01) - (vok - 0.01)) / 2 # ≈ 0.01
t._mok[field] = vok t._mok[field] = vok
# expect_nodata requests to verify all fields besides timestamp-related to be NA. # expect_nodata requests to verify all fields besides timestamp-related to be NA.
...@@ -183,6 +201,18 @@ def test_LogMeasure(): ...@@ -183,6 +201,18 @@ def test_LogMeasure():
τ_logm += 1 τ_logm += 1
counters_prev = {} # reset counters_prev = {} # reset
# tdrb_stats is the verb to verify handling of x.drb_stats message.
#
# it xlogs drb stats with given δτ relative to either previous (δτ > 0) or
# next (δτ < 0) stats or event.
def tdrb_stats(δτ, qci_trx):
if δτ >= 0:
τ = τ_xlog + δτ # after previous stats or event
else:
τ = τ_xlog+1 + δτ # before next stats or event
trace('\n>>> tdrb_stats τ: %s τ_xlog: %s τ_logm: %s' % (τ, τ_xlog, τ_logm))
t.xlog( jdrb_stats(τ, qci_trx) )
# further empty stats # further empty stats
...@@ -266,6 +296,70 @@ def test_LogMeasure(): ...@@ -266,6 +296,70 @@ def test_LogMeasure():
_('ERAB.EstabAddSuccNbr.sum', 2) _('ERAB.EstabAddSuccNbr.sum', 2)
# DRB.IPVol / DRB.IPTime (testing all variants of stats/x.drb_stats interaction)
tδstats({})
tδstats({}) # ──S₁·d₁─────S₂·d₂─────S₃·d₃──
tdrb_stats(+0.1, {1: drb_trx(1.1,10, 1.2,20),
11: drb_trx(1.3,30, 1.4,40)})
# nothing here - d₁ comes as the first drb_stats
tδstats({}) # S₂
tdrb_stats(+0.1, {2: drb_trx(2.1,100, 2.2,200), # d₂ is included into S₁-S₂
22: drb_trx(2.3,300, 2.4,400)})
_('DRB.IPTimeDl.2', 2.1); _('DRB.IPVolDl.2', 8*100)
_('DRB.IPTimeUl.2', 2.2); _('DRB.IPVolUl.2', 8*200)
_('DRB.IPTimeDl.22', 2.3); _('DRB.IPVolDl.22', 8*300)
_('DRB.IPTimeUl.22', 2.4); _('DRB.IPVolUl.22', 8*400)
tδstats({}) # S₃
tdrb_stats(+0.1, {3: drb_trx(3.1,1000, 3.2,2000), # d₃ is included int S₂-S₃
33: drb_trx(3.3,3000, 3.4,4000)})
_('DRB.IPTimeDl.3', 3.1); _('DRB.IPVolDl.3', 8*1000)
_('DRB.IPTimeUl.3', 3.2); _('DRB.IPVolUl.3', 8*2000)
_('DRB.IPTimeDl.33', 3.3); _('DRB.IPVolDl.33', 8*3000)
_('DRB.IPTimeUl.33', 3.4); _('DRB.IPVolUl.33', 8*4000)
tdrb_stats(-0.1, {1: drb_trx(1.1,11, 1.2,12)}) # ──S·d─────d·S─────d·S──
tδstats({}) # cont↑
_('DRB.IPTimeDl.1', 1.1); _('DRB.IPVolDl.1', 8*11)
_('DRB.IPTimeUl.1', 1.2); _('DRB.IPVolUl.1', 8*12)
tdrb_stats(-0.1, {2: drb_trx(2.1,21, 2.2,22)})
tδstats({})
_('DRB.IPTimeDl.2', 2.1); _('DRB.IPVolDl.2', 8*21)
_('DRB.IPTimeUl.2', 2.2); _('DRB.IPVolUl.2', 8*22)
tdrb_stats(-0.1, {3: drb_trx(3.1,31, 3.2,32)}) # ──d·S─────d·S─────d·S·d──
tδstats({}) # cont↑
_('DRB.IPTimeDl.3', 3.1); _('DRB.IPVolDl.3', 8*31)
_('DRB.IPTimeUl.3', 3.2); _('DRB.IPVolUl.3', 8*32)
tdrb_stats(-0.1, {4: drb_trx(4.1,41, 4.2,42)})
tδstats({})
tdrb_stats(+0.1, {5: drb_trx(5.1,51, 5.2,52)})
_('DRB.IPTimeDl.4', 4.1); _('DRB.IPVolDl.4', 8*41)
_('DRB.IPTimeUl.4', 4.2); _('DRB.IPVolUl.4', 8*42)
_('DRB.IPTimeDl.5', 5.1); _('DRB.IPVolDl.5', 8*51)
_('DRB.IPTimeUl.5', 5.2); _('DRB.IPVolUl.5', 8*52)
tdrb_stats(+0.5, {6: drb_trx(6.1,61, 6.2,62)}) # ──d·S·d──d──S───d──S──
tδstats({}) # cont↑
_('DRB.IPTimeDl.6', 6.1); _('DRB.IPVolDl.6', 8*61)
_('DRB.IPTimeUl.6', 6.2); _('DRB.IPVolUl.6', 8*62)
tdrb_stats(+0.51,{7: drb_trx(7.1,71, 7.2,72)})
tδstats({})
_('DRB.IPTimeDl.7', 7.1); _('DRB.IPVolDl.7', 8*71)
_('DRB.IPTimeUl.7', 7.2); _('DRB.IPVolUl.7', 8*72)
tdrb_stats(-0.1, {8: drb_trx(8.1,81, 8.2,82)}) # combined d + S with nonzero counters
tδstats({'s1_initial_context_setup_request': +3, # d──S────d·S──
's1_initial_context_setup_response': +2}) # cont↑
_('DRB.IPTimeDl.8', 8.1); _('DRB.IPVolDl.8', 8*81)
_('DRB.IPTimeUl.8', 8.2); _('DRB.IPVolUl.8', 8*82)
_('S1SIG.ConnEstabAtt', 3)
_('S1SIG.ConnEstabSucc', 2)
_('ERAB.EstabInitAttNbr.sum', 3) # currently same as S1SIG.ConnEstab
_('ERAB.EstabInitSuccNbr.sum', 2) # ----//----
# service detach/attach, connect failure, xlog failure # service detach/attach, connect failure, xlog failure
tδstats({}) # untie from previous history tδstats({}) # untie from previous history
i, f = 'rrc_connection_request', 'rrc_connection_setup_complete' i, f = 'rrc_connection_request', 'rrc_connection_setup_complete'
...@@ -469,6 +563,52 @@ def test_jstats(): ...@@ -469,6 +563,52 @@ def test_jstats():
'{"message": "stats", "utc": 123.4, "cells": {"1": {"counters": {"messages": {"rrc_x": 1, "rrc_z": 3}}}}, "counters": {"messages": {"s1_y": 2, "x2_zz": 4}}}' '{"message": "stats", "utc": 123.4, "cells": {"1": {"counters": {"messages": {"rrc_x": 1, "rrc_z": 3}}}}, "counters": {"messages": {"s1_y": 2, "x2_zz": 4}}}'
# jdrb_stats, similarly to jstats, returns json-encoded x.drb_stats message
# corresponding to per-QCI dl/ul tx_time/tx_bytes.
def jdrb_stats(τ, qci_dlul): # -> str
qci_dlul = qci_dlul.copy()
for qci, dlul in qci_dlul.items():
assert isinstance(dlul, dict)
assert set(dlul.keys()) == {"dl_tx_bytes", "dl_tx_time", "dl_tx_time_notailtti",
"ul_tx_bytes", "ul_tx_time", "ul_tx_time_notailtti"}
dlul["dl_tx_time_err"] = 0 # original time is simulated to be
dlul["ul_tx_time_err"] = 0 # measured precisely in tess.
dlul["dl_tx_time_notailtti_err"] = 0 # ----//----
dlul["ul_tx_time_notailtti_err"] = 0 #
s = {
"message": "x.drb_stats",
"utc": τ,
"qci_dict": qci_dlul,
}
return json.dumps(s)
def test_jdrb_stats():
# NOTE json encodes 5 and 9 keys are strings, not integers
x = 0.01
assert jdrb_stats(100, {5: drb_trx(0.1,1234, 0.2,4321),
9: drb_trx(1.1,7777, 1.2,8888)}) == ( \
'{"message": "x.drb_stats", "utc": 100, "qci_dict":' + \
' {"5": {"dl_tx_bytes": 1234, "dl_tx_time": %(0.1+x)s, "dl_tx_time_notailtti": %(0.1-x)s,' + \
' "ul_tx_bytes": 4321, "ul_tx_time": %(0.2+x)s, "ul_tx_time_notailtti": %(0.2-x)s,' + \
' "dl_tx_time_err": 0, "ul_tx_time_err": 0, "dl_tx_time_notailtti_err": 0, "ul_tx_time_notailtti_err": 0},' + \
' "9": {"dl_tx_bytes": 7777, "dl_tx_time": 1.11, "dl_tx_time_notailtti": 1.09,' + \
' "ul_tx_bytes": 8888, "ul_tx_time": 1.21, "ul_tx_time_notailtti": 1.19,' + \
' "dl_tx_time_err": 0, "ul_tx_time_err": 0, "dl_tx_time_notailtti_err": 0, "ul_tx_time_notailtti_err": 0}' + \
'}}') % {
'0.1-x': 0.1-x, '0.1+x': 0.1+x, # working-around float impreciseness
'0.2-x': 0.2-x, '0.2+x': 0.2+x,
}
# drb_trx returns dict describing dl/ul transmissions of a data radio bearer.
# such dict is used as per-QCI entry in x.drb_stats
def drb_trx(dl_tx_time, dl_tx_bytes, ul_tx_time, ul_tx_bytes):
return {"dl_tx_bytes": dl_tx_bytes, "dl_tx_time": dl_tx_time + 0.01, "dl_tx_time_notailtti": dl_tx_time - 0.01,
"ul_tx_bytes": ul_tx_bytes, "ul_tx_time": ul_tx_time + 0.01, "ul_tx_time_notailtti": ul_tx_time - 0.01}
# ionone returns empty data source. # ionone returns empty data source.
def ionone(): def ionone():
return io.BytesIO(b'') return io.BytesIO(b'')
...@@ -183,6 +183,8 @@ class Measurement(np.void): ...@@ -183,6 +183,8 @@ class Measurement(np.void):
('DRB.IPVolUl.QCI', np.int64), # bit 4.4.6.4 32.450:6.3.1 NOTE not kbit ('DRB.IPVolUl.QCI', np.int64), # bit 4.4.6.4 32.450:6.3.1 NOTE not kbit
('DRB.IPTimeDl.QCI', Ttime), # s 4.4.6.5 32.450:6.3.1 NOTE not ms ('DRB.IPTimeDl.QCI', Ttime), # s 4.4.6.5 32.450:6.3.1 NOTE not ms
('DRB.IPTimeUl.QCI', Ttime), # s 4.4.6.6 32.450:6.3.1 NOTE not ms ('DRB.IPTimeUl.QCI', Ttime), # s 4.4.6.6 32.450:6.3.1 NOTE not ms
('XXX.DRB.IPTimeDl_err.QCI', Ttime), # s XXX error for DRB.IPTimeDl.QCI (will be removed)
('XXX.DRB.IPTimeUl_err.QCI', Ttime), # s XXX error for DRB.IPTimeUl.QCI (will be removed)
('RRU.CellUnavailableTime.CAUSE', Ttime), # s 4.5.6 ('RRU.CellUnavailableTime.CAUSE', Ttime), # s 4.5.6
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment