Commit 88003d9c authored by Kirill Smelkov's avatar Kirill Smelkov

kpi: Establish data model for DRB.IPLatDl.QCI

Since the beginning - since dc1d5481 (kpi: Start of the package)
DRB.IPLatDl.QCI was introduced only in commented form with the following
remark:

        # XXX mean is not good for our model
        # TODO mean -> total + npkt?
        #('DRB.IPLatDl.QCI',                Ttime),     # s         4.4.5.1  32.450:6.3.2   NOTE not ms

The problem here is that if we introduce DRB.IPLatDl.QCI as just Ttime for
average latency, and we have two measurements m1 and m2 with such
DRB.IPLatDl, there is no way to know what DRB.IPLatDl should be
for aggregated measurement - in the aggregated measurement the latency
should be the mean time - averaged for combined periods of m1+m2, over
samples of all transmision bursts. And knowing something already averaged in
period1 and period2 we can compute the average for aggregated measurement
only if we know both initial averages _and_ the number of samples in each
period. That's what the "TODO mean -> total + npkt?" comment was about.

Besides DRB.IPLatDl there are many other values that 3GPP say to be
mean. For example UE.Active and other values. So there is a need to
uniformly represent such averages somehow and that there is a way to
also aggregate the averages for combined measurements.

-> Introduce Stat type, that represents results of statistical profiling
   and use it for DRB.IPLatDl.QCI; Teach Calc.aggregate to handle
   aggregation of such statistical profiles via

            a₁⋅n₁ + a₂·n₂
	A = ─────────────
               n₁ + n₂

   formula.
parent 205616f2
......@@ -121,6 +121,23 @@ class MeasurementLog:
pass
# Stat[dtype] represents result of statistical profiling with arbitrary sampling
# for a value with specified dtype.
#
# It is organized as NumPy structured scalar with avg, min, max and n fields.
#
# It is used inside Measurement for e.g. DRB.IPLatDl.QCI .
class Stat(np.void):
# _dtype_for returns dtype that Stat[dtype] will use.
@classmethod
def _dtype_for(cls, dtype):
return np.dtype((cls, [
('avg', np.float64), # NOTE even int becomes float on averaging
('min', dtype),
('max', dtype),
('n', np.int64)]))
# Measurement represents set of measured values and events observed and counted
# during one particular period of time.
#
......@@ -157,6 +174,7 @@ class MeasurementLog:
class Measurement(np.void):
Tcc = np.int32 # cumulative counter
Ttime = np.float64 # time is represented in seconds since epoch
S = Stat ._dtype_for # statistical profile with arbitrary sampling
# _dtype defines measured values and events.
_dtype = np.dtype([
......@@ -166,7 +184,7 @@ class Measurement(np.void):
# below come values/events as specified by TS 32.425 and TS 32.450
# NOTE all .QCI and .CAUSE are expanded from outside.
#
# NAME TYPE UNIT TS 32.425 reference + ...
# NAME TYPE/DTYPE UNIT TS 32.425 reference + ...
('RRC.ConnEstabAtt.CAUSE', Tcc), # 1 4.1.1.1
('RRC.ConnEstabSucc.CAUSE', Tcc), # 1 4.1.1.2
......@@ -181,9 +199,8 @@ class Measurement(np.void):
('DRB.PdcpSduBitrateUl.QCI', np.float64),# bit/s 4.4.1.1 NOTE not kbit/s
('DRB.PdcpSduBitrateDl.QCI', np.float64),# bit/s 4.4.1.2 NOTE not kbit/s
# XXX mean is not good for our model
# TODO mean -> total + npkt?
#('DRB.IPLatDl.QCI', Ttime), # s 4.4.5.1 32.450:6.3.2 NOTE not ms
('DRB.IPLatDl.QCI', S(Ttime)), # s 4.4.5.1 32.450:6.3.2 NOTE not ms
# DRB.IPThpX.QCI = DRB.IPVolX.QCI / DRB.IPTimeX.QCI 4.4.6.1-2 32.450:6.3.1
('DRB.IPVolDl.QCI', np.int64), # bit 4.4.6.3 32.450:6.3.1 NOTE not kbit
......@@ -208,6 +225,8 @@ class Measurement(np.void):
('PEE.Energy', np.float64),# J 4.12.2 NOTE not kWh
])
del S
# Interval is NumPy structured scalar that represents [lo,hi) interval.
#
......@@ -275,6 +294,16 @@ def __new__(cls):
Σ[field]['τ_na'] = 0
return Σ
# Stat() creates new Stat instance with specified values and dtype.
@func(Stat)
def __new__(cls, min, avg, max, n, dtype=np.float64):
s = _newscalar(cls, cls._dtype_for(dtype))
s['min'] = min
s['avg'] = avg
s['max'] = max
s['n'] = n
return s
# _all_qci expands <name>.QCI into <name>.sum and [] of <name>.<qci> for all possible qci values.
# TODO remove and use direct array access (after causes are expanded into array too)
......@@ -368,6 +397,21 @@ def __str__(m):
vv.append(_vstr(m[field]))
return "(%s)" % ', '.join(vv)
# __repr__ returns Stat(min, avg, max, n, dtype=...)
# NA values are represented as "ø".
@func(Stat)
def __repr__(s):
return "Stat(%s, %s, %s, %s, dtype=%s)" % (_vstr(s['min']), _vstr(s['avg']),
_vstr(s['max']), _vstr(s['n']), s['min'].dtype)
# __str__ returns "<min avg max>·n"
# NA values are represented as "ø".
@func(Stat)
def __str__(s):
return "<%s %s %s>·%s" % (_vstr(s['min']), _vstr(s['avg']), _vstr(s['max']), _vstr(s['n']))
# _vstr returns string representation of scalar or subarray v.
def _vstr(v): # -> str
if v.shape == (): # scalar
......@@ -379,9 +423,17 @@ def _vstr(v): # -> str
va = [] # subarray with some non-ø data
for k in range(v.shape[0]):
if v[k] == 0:
continue
va.append('%d:%s' % (k, 'ø' if isNA(v[k]) else str(v[k])))
vk = v[k]
if isinstance(vk, np.void):
for name in vk.dtype.names:
if vk[name] != 0:
break
else:
continue
else:
if vk == 0:
continue
va.append('%d:%s' % (k, 'ø' if isNA(vk) else str(vk)))
return "{%s}" % ' '.join(va)
......@@ -424,8 +476,14 @@ def _check_valid(m):
continue
# * ≥ 0
if v < 0:
bad(".%s < 0 (%s)" % (field, v))
if not isinstance(v, np.void):
if v < 0:
bad(".%s < 0 (%s)" % (field, v))
else:
for vfield in v.dtype.names:
vf = v[vfield]
if not isNA(vf) and vf < 0:
bad(".%s.%s < 0 (%s)" % (field, vfield, vf))
# fini ≤ init
if "Succ" in field:
......@@ -705,6 +763,25 @@ def aggregate(calc): # -> ΣMeasurement
Σ['X.Tstart'] = calc.τ_lo
Σ['X.δT'] = calc.τ_hi - calc.τ_lo
def xmin(a, b):
if isNA(a): return b
if isNA(b): return a
return min(a, b)
def xmax(a, b):
if isNA(a): return b
if isNA(b): return a
return max(a, b)
def xavg(a, na, b, nb): # -> <ab>, na+nb
if isNA(a) or isNA(na):
return b, nb
if isNA(b) or isNA(nb):
return a, na
nab = na+nb
ab = (a*na + b*nb)/nab
return ab, nab
for m in calc._miter():
for field in m.dtype.names:
if field.startswith('X.'): # X.Tstart, X.δT
......@@ -714,12 +791,29 @@ def aggregate(calc): # -> ΣMeasurement
if v.shape != (): # skip subarrays - rely on aliases
continue
Σf = Σ[field] # view to Σ[field]
Σv = Σf['value'] # view to Σ[field]['value']
if isNA(v):
Σ[field]['τ_na'] += m['X.δT']
Σf['τ_na'] += m['X.δT']
continue
if isNA(Σv):
Σf['value'] = v
continue
if isinstance(v, np.number):
Σf['value'] += v
elif isinstance(v, Stat):
Σv['min'] = xmin(Σv['min'], v['min'])
Σv['max'] = xmax(Σv['max'], v['max'])
# TODO better sum everything and then divide as a whole to avoid loss of precision
Σv['avg'], Σv['n'] = xavg(Σv['avg'], Σv['n'],
v['avg'], v['n'])
else:
if isNA(Σ[field]['value']):
Σ[field]['value'] = 0
Σ[field]['value'] += v
raise AssertionError("Calc.aggregate: unexpected type %r" % type(v))
return Σ
......@@ -840,15 +934,20 @@ def NA(dtype):
typ = dtype.type
# float
if issubclass(typ, np.floating):
na = np.nan
na = typ(np.nan) # return the same type as dtype has, e.g. np.int32, not int
# int: NA is min value
elif issubclass(typ, np.signedinteger):
na = np.iinfo(typ).min
na = typ(np.iinfo(typ).min)
# structure: NA is combination of NAs for fields
elif issubclass(typ, np.void):
na = _newscalar(typ, dtype)
for field in dtype.names:
na[field] = NA(dtype.fields[field][0])
else:
raise AssertionError("NA not defined for dtype %s" % (dtype,))
return typ(na) # return the same type as dtype has, e.g. np.int32, not int
assert type(na) is typ
return na
# isNA returns whether value represent NA.
......@@ -857,6 +956,26 @@ def NA(dtype):
# returns array(True/False) if value is array.
def isNA(value):
na = NA(value.dtype)
if np.isnan(na):
return np.isnan(value) # `nan == nan` gives False
# `nan == nan` gives False
# work it around by checking for nan explicitly
if isinstance(na, np.void): # items are structured scalars
vna = None
for field in value.dtype.names:
nf = na[field]
vf = value[field]
if np.isnan(nf):
x = np.isnan(vf)
else:
x = (vf == nf)
if vna is None:
vna = x
else:
vna &= x
return vna
else:
if np.isnan(na):
return np.isnan(value)
return value == na
......@@ -21,10 +21,12 @@
from __future__ import print_function, division, absolute_import
from xlte.kpi import Calc, MeasurementLog, Measurement, ΣMeasurement, Interval, \
NA, isNA, Σqci, Σcause, nqci
Stat, NA, isNA, Σqci, Σcause, nqci
import numpy as np
from pytest import raises
ms = 1e-3
def test_Measurement():
m = Measurement()
......@@ -44,6 +46,8 @@ def test_Measurement():
_('DRB.IPVolDl.sum') # int64
_('DRB.IPTimeDl.7') # .QCI alias
_('DRB.IPTimeDl.QCI') # .QCI array
_('DRB.IPLatDl.7') # .QCI alias to Stat
_('DRB.IPLatDl.QCI') # .QCI array of Stat
# everything automatically
for name in m.dtype.names:
_(name)
......@@ -66,9 +70,21 @@ def test_Measurement():
continue
assert m['DRB.IPVolDl.%d' % k] == 0
assert m['DRB.IPVolDl.QCI'][k] == 0
m['DRB.IPLatDl.QCI'][:]['avg'] = 0
m['DRB.IPLatDl.QCI'][:]['min'] = 0
m['DRB.IPLatDl.QCI'][:]['max'] = 0
m['DRB.IPLatDl.QCI'][:]['n'] = 0
m['DRB.IPLatDl.QCI'][3]['avg'] = 33
m['DRB.IPLatDl.QCI'][3]['n'] = 123
m['DRB.IPLatDl.4']['avg'] = 44
m['DRB.IPLatDl.4']['n'] = 432
m['DRB.IPLatDl.8']['avg'] = NA(m['DRB.IPLatDl.8']['avg'].dtype)
m['DRB.IPLatDl.8']['n'] = NA(m['DRB.IPLatDl.8']['n'] .dtype)
# str/repr
assert repr(m) == "Measurement(RRC.ConnEstabAtt.sum=17, DRB.IPVolDl.QCI={5:55 7:ø 9:99}, S1SIG.ConnEstabAtt=123)"
assert repr(m) == "Measurement(RRC.ConnEstabAtt.sum=17, DRB.IPLatDl.QCI={3:<0.0 33.0 0.0>·123 4:<0.0 44.0 0.0>·432 8:<0.0 ø 0.0>·ø}, DRB.IPVolDl.QCI={5:55 7:ø 9:99}, S1SIG.ConnEstabAtt=123)"
assert repr(m['DRB.IPLatDl.3']) == "Stat(0.0, 33.0, 0.0, 123, dtype=float64)"
s = str(m)
assert s[0] == '('
assert s[-1] == ')'
......@@ -77,6 +93,7 @@ def test_Measurement():
vok[m.dtype.names.index("RRC.ConnEstabAtt.sum")] = "17"
vok[m.dtype.names.index("S1SIG.ConnEstabAtt")] = "123"
vok[m.dtype.names.index("DRB.IPVolDl.QCI")] = "{5:55 7:ø 9:99}"
vok[m.dtype.names.index("DRB.IPLatDl.QCI")] = "{3:<0.0 33.0 0.0>·123 4:<0.0 44.0 0.0>·432 8:<0.0 ø 0.0>·ø}"
assert v == vok
# verify that time fields has enough precision
......@@ -506,12 +523,14 @@ def test_Calc_aggregate():
m1['X.δT'] = 2
m1['S1SIG.ConnEstabAtt'] = 12 # Tcc
m1['ERAB.SessionTimeUE'] = 1.2 # Ttime
m1['DRB.IPLatDl.7'] = Stat(4*ms, 7.32*ms, 25*ms, 17) # Stat
m2 = Measurement()
m2['X.Tstart'] = 5 # NOTE [3,5) is NA hole
m2['X.δT'] = 3
m2['S1SIG.ConnEstabAtt'] = 11
m2['ERAB.SessionTimeUE'] = 0.7
m2['DRB.IPLatDl.7'] = Stat(3*ms, 5.23*ms, 11*ms, 11)
mlog.append(m1)
mlog.append(m2)
......@@ -532,6 +551,9 @@ def test_Calc_aggregate():
assert M['ERAB.SessionTimeUE']['value'] == 1.2 + 0.7
assert M['ERAB.SessionTimeUE']['τ_na'] == 5
assert M['DRB.IPLatDl.7']['value'] == Stat(3*ms, (7.32*17 + 5.23*11)/(17+11)*ms, 25*ms, 17+11)
assert M['DRB.IPLatDl.7']['τ_na'] == 5
# assert that everything else is NA with τ_na == 10
def _(name):
......@@ -542,7 +564,7 @@ def test_Calc_aggregate():
assert f['τ_na'] == 10
for name in M.dtype.names:
if name not in ('X.Tstart', 'X.δT', 'S1SIG.ConnEstabAtt',
'ERAB.SessionTimeUE'):
'ERAB.SessionTimeUE', 'DRB.IPLatDl.7'):
_(name)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment