Commit 06fefd0b authored by Kirill Smelkov's avatar Kirill Smelkov

py2: *: Drop type annotation from function signatures

Python2 does not support type annotations.
parent b56b6ba0
...@@ -659,7 +659,7 @@ def __repr__(s): ...@@ -659,7 +659,7 @@ def __repr__(s):
# NOTE we cannot go polling to higher than 100Hz frequency, since enb # NOTE we cannot go polling to higher than 100Hz frequency, since enb
# rate-limits websocket requests to execute not faster than 10ms each. # rate-limits websocket requests to execute not faster than 10ms each.
@func @func
def _x_stats_srv(ctx, reqch: chan, conn: amari.Conn): def _x_stats_srv(ctx, reqch, conn):
dt_rate = 10*tti dt_rate = 10*tti
# rx_ue_get_stats sends `ue_get[stats]` request and returns server response. # rx_ue_get_stats sends `ue_get[stats]` request and returns server response.
......
...@@ -204,7 +204,7 @@ def _read(logm): ...@@ -204,7 +204,7 @@ def _read(logm):
# _handle_stats handles next stats xlog entry upon _read request. # _handle_stats handles next stats xlog entry upon _read request.
@func(LogMeasure) @func(LogMeasure)
def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement): def _handle_stats(logm, stats, m_prev):
# build Measurement from stats' counters. # build Measurement from stats' counters.
# #
# we take δ(stats_prev, stat) and process it mapping Amarisoft counters to # we take δ(stats_prev, stat) and process it mapping Amarisoft counters to
...@@ -338,7 +338,7 @@ def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement): ...@@ -338,7 +338,7 @@ def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement):
# only configurations with one single cell are supported. # only configurations with one single cell are supported.
# ( because else it would not be clear to which cell to associate e.g. global # ( because else it would not be clear to which cell to associate e.g. global
# counters for S1 messages ) # counters for S1 messages )
def _stats_check(stats: xlog.Message): def _stats_check(stats):
cells = stats['cells'] cells = stats['cells']
if len(cells) != 1: if len(cells) != 1:
raise LogError(stats.timestamp, "stats describes %d cells; but only single-cell configurations are supported" % len(cells)) raise LogError(stats.timestamp, "stats describes %d cells; but only single-cell configurations are supported" % len(cells))
...@@ -355,7 +355,7 @@ def _stats_check(stats: xlog.Message): ...@@ -355,7 +355,7 @@ def _stats_check(stats: xlog.Message):
# #
# counter may be both "global" or "per-cell". # counter may be both "global" or "per-cell".
# stats is assumed to be already verified by _stats_check. # stats is assumed to be already verified by _stats_check.
def _stats_cc(stats: xlog.Message, counter: str): def _stats_cc(stats, counter):
cells = stats['cells'] cells = stats['cells']
cell = list(cells.values())[0] cell = list(cells.values())[0]
...@@ -369,7 +369,7 @@ def _stats_cc(stats: xlog.Message, counter: str): ...@@ -369,7 +369,7 @@ def _stats_cc(stats: xlog.Message, counter: str):
# _handle_drb_stats handles next x.drb_stats xlog entry upon _read request. # _handle_drb_stats handles next x.drb_stats xlog entry upon _read request.
@func(LogMeasure) @func(LogMeasure)
def _handle_drb_stats(logm, drb_stats: xlog.Message): def _handle_drb_stats(logm, drb_stats):
# TODO precheck for correct message structure similarly to _stats_check # TODO precheck for correct message structure similarly to _stats_check
drb_stats_prev = logm._drb_stats drb_stats_prev = logm._drb_stats
...@@ -413,7 +413,7 @@ def _handle_drb_stats(logm, drb_stats: xlog.Message): ...@@ -413,7 +413,7 @@ def _handle_drb_stats(logm, drb_stats: xlog.Message):
return return
# _drb_update updates Measurement from dl/ul DRB statistics related to measurement's time coverage. # _drb_update updates Measurement from dl/ul DRB statistics related to measurement's time coverage.
def _drb_update(m: kpi.Measurement, drb_stats: xlog.Message): def _drb_update(m, drb_stats):
# TODO Exception -> LogError("internal failure") similarly to _handle_stats # TODO Exception -> LogError("internal failure") similarly to _handle_stats
qci_trx = drb_stats.get1("qci_dict", dict) qci_trx = drb_stats.get1("qci_dict", dict)
......
...@@ -374,7 +374,7 @@ class _XMsgServer: ...@@ -374,7 +374,7 @@ class _XMsgServer:
# run runs the extra server on amari service attached to via conn. # run runs the extra server on amari service attached to via conn.
@func @func
def run(xsrv, ctx, conn: amari.Conn, ready: chan): def run(xsrv, ctx, conn, ready):
xsrv._runCtx, cancel = context.with_cancel(ctx) xsrv._runCtx, cancel = context.with_cancel(ctx)
defer(cancel) defer(cancel)
ready.close() ready.close()
......
...@@ -78,7 +78,7 @@ ...@@ -78,7 +78,7 @@
"source": [ "source": [
"from xlte import kpi\n", "from xlte import kpi\n",
"\n", "\n",
"def load_measurements(alogm: akpi.LogMeasure) -> kpi.MeasurementLog:\n", "def load_measurements(alogm): # -> kpi.MeasurementLog:\n",
" mlog = kpi.MeasurementLog()\n", " mlog = kpi.MeasurementLog()\n",
" try:\n", " try:\n",
" while 1:\n", " while 1:\n",
...@@ -115,7 +115,7 @@ ...@@ -115,7 +115,7 @@
"from datetime import datetime\n", "from datetime import datetime\n",
"\n", "\n",
"# calc_each_period partitions mlog data into periods and yields kpi.Calc for each period.\n", "# calc_each_period partitions mlog data into periods and yields kpi.Calc for each period.\n",
"def calc_each_period(mlog: kpi.MeasurementLog, tperiod: float): # -> yield kpi.Calc\n", "def calc_each_period(mlog, tperiod): # -> yield kpi.Calc\n",
" tau = mlog.data()[0]['X.Tstart']\n", " tau = mlog.data()[0]['X.Tstart']\n",
" for m in mlog.data()[1:]:\n", " for m in mlog.data()[1:]:\n",
" tau_ = m['X.Tstart']\n", " tau_ = m['X.Tstart']\n",
......
...@@ -49,7 +49,7 @@ def main(): ...@@ -49,7 +49,7 @@ def main():
# of kpi.Measurement, which is driver-independent representation for # of kpi.Measurement, which is driver-independent representation for
# KPI-related measurement data. # KPI-related measurement data.
def load_measurements(alogm: akpi.LogMeasure) -> kpi.MeasurementLog: def load_measurements(alogm): # -> kpi.MeasurementLog
mlog = kpi.MeasurementLog() mlog = kpi.MeasurementLog()
while 1: while 1:
m = alogm.read() m = alogm.read()
...@@ -66,7 +66,7 @@ def main(): ...@@ -66,7 +66,7 @@ def main():
# period, and further use kpi.Calc to compute the KPIs over each period. # period, and further use kpi.Calc to compute the KPIs over each period.
# calc_each_period partitions mlog data into periods and yields kpi.Calc for each period. # calc_each_period partitions mlog data into periods and yields kpi.Calc for each period.
def calc_each_period(mlog: kpi.MeasurementLog, tperiod: float): # -> yield kpi.Calc def calc_each_period(mlog, tperiod): # -> yield kpi.Calc
tau = mlog.data()[0]['X.Tstart'] tau = mlog.data()[0]['X.Tstart']
for m in mlog.data()[1:]: for m in mlog.data()[1:]:
tau_ = m['X.Tstart'] tau_ = m['X.Tstart']
...@@ -133,7 +133,7 @@ def main(): ...@@ -133,7 +133,7 @@ def main():
# ---- plotting routines ---- # ---- plotting routines ----
# figplot_erab_accessibility plots E-RAB Accessibility KPI data on the figure. # figplot_erab_accessibility plots E-RAB Accessibility KPI data on the figure.
def figplot_erab_accessibility(fig: plt.Figure, vtau, vInititialEPSBEstabSR, vAddedEPSBEstabSR, tperiod=None): def figplot_erab_accessibility(fig, vtau, vInititialEPSBEstabSR, vAddedEPSBEstabSR, tperiod=None):
ax1, ax2 = fig.subplots(2, 1, sharex=True) ax1, ax2 = fig.subplots(2, 1, sharex=True)
fig.suptitle("E-RAB Accessibility / %s" % (tpretty(tperiod) if tperiod is not None else fig.suptitle("E-RAB Accessibility / %s" % (tpretty(tperiod) if tperiod is not None else
vtau_period_pretty(vtau))) vtau_period_pretty(vtau)))
...@@ -145,7 +145,7 @@ def figplot_erab_accessibility(fig: plt.Figure, vtau, vInititialEPSBEstabSR, vAd ...@@ -145,7 +145,7 @@ def figplot_erab_accessibility(fig: plt.Figure, vtau, vInititialEPSBEstabSR, vAd
# figplot_eutran_ip_throughput plots E-UTRAN IP Throughput KPI data on the figure. # figplot_eutran_ip_throughput plots E-UTRAN IP Throughput KPI data on the figure.
def figplot_eutran_ip_throughput(fig: plt.Figure, vtau, vIPThp_qci, tperiod=None): def figplot_eutran_ip_throughput(fig, vtau, vIPThp_qci, tperiod=None):
ax1, ax2 = fig.subplots(2, 1, sharex=True) ax1, ax2 = fig.subplots(2, 1, sharex=True)
fig.suptitle("E-UTRAN IP Throughput / %s" % (tpretty(tperiod) if tperiod is not None else fig.suptitle("E-UTRAN IP Throughput / %s" % (tpretty(tperiod) if tperiod is not None else
vtau_period_pretty(vtau))) vtau_period_pretty(vtau)))
......
...@@ -237,7 +237,7 @@ def __new__(cls): ...@@ -237,7 +237,7 @@ def __new__(cls):
# _all_qci expands <name>.QCI into <name>.sum and [] of <name>.<qci> for all possible qci values. # _all_qci expands <name>.QCI into <name>.sum and [] of <name>.<qci> for all possible qci values.
# TODO remove and use direct array access (after causes are expanded into array too) # TODO remove and use direct array access (after causes are expanded into array too)
nqci = 256 # all possible QCIs ∈ [0,255], standard ones are described in 23.203 Table 6.1.7 nqci = 256 # all possible QCIs ∈ [0,255], standard ones are described in 23.203 Table 6.1.7
def _all_qci(name_qci: str): # -> name_sum, ()name_qciv def _all_qci(name_qci): # -> name_sum, ()name_qciv
if not name_qci.endswith(".QCI"): if not name_qci.endswith(".QCI"):
raise AssertionError("invalid name_qci %r: no .QCI suffix" % name_qci) raise AssertionError("invalid name_qci %r: no .QCI suffix" % name_qci)
name = name_qci[:-len(".QCI")] name = name_qci[:-len(".QCI")]
...@@ -245,7 +245,7 @@ def _all_qci(name_qci: str): # -> name_sum, ()name_qciv ...@@ -245,7 +245,7 @@ def _all_qci(name_qci: str): # -> name_sum, ()name_qciv
return name+".sum", name_qciv return name+".sum", name_qciv
# _all_cause expands <name>.CAUSE into <name>.sum and [] of <name>.<cause> for all possible cause values. # _all_cause expands <name>.CAUSE into <name>.sum and [] of <name>.<cause> for all possible cause values.
def _all_cause(name_cause: str): # -> name_sum, ()name_causev def _all_cause(name_cause): # -> name_sum, ()name_causev
if not name_cause.endswith(".CAUSE"): if not name_cause.endswith(".CAUSE"):
raise AssertionError("invalid name_cause %r: no .CAUSE suffix" % name_cause) raise AssertionError("invalid name_cause %r: no .CAUSE suffix" % name_cause)
name = name_cause[:-len(".CAUSE")] name = name_cause[:-len(".CAUSE")]
...@@ -409,7 +409,7 @@ def data(mlog): ...@@ -409,7 +409,7 @@ def data(mlog):
# append adds new Measurement to the tail of MeasurementLog. # append adds new Measurement to the tail of MeasurementLog.
@func(MeasurementLog) @func(MeasurementLog)
def append(mlog, m: Measurement): def append(mlog, m):
m._check_valid() m._check_valid()
# verify .Tstart↑ # verify .Tstart↑
if len(mlog._data) > 0: if len(mlog._data) > 0:
...@@ -448,7 +448,7 @@ def forget_past(mlog, Tcut): ...@@ -448,7 +448,7 @@ def forget_past(mlog, Tcut):
# The time interval, that will actually be used for computations, is potentially wider. # The time interval, that will actually be used for computations, is potentially wider.
# See Calc class documentation for details. # See Calc class documentation for details.
@func(Calc) @func(Calc)
def __init__(calc, mlog: MeasurementLog, tau_lo, tau_hi): def __init__(calc, mlog, tau_lo, tau_hi):
assert tau_lo <= tau_hi assert tau_lo <= tau_hi
data = mlog.data() data = mlog.data()
l = len(data) l = len(data)
...@@ -711,7 +711,7 @@ def __new__(cls, lo, hi): ...@@ -711,7 +711,7 @@ def __new__(cls, lo, hi):
# Sqci(m, 'ERAB.EstabInitSuccNbr.QCI') # Sqci(m, 'ERAB.EstabInitSuccNbr.QCI')
# #
# name_qci must have '.QCI' suffix. # name_qci must have '.QCI' suffix.
def Sqci(m: Measurement, name_qci: str): def Sqci(m, name_qci):
return _Sx(m, name_qci, _all_qci) return _Sx(m, name_qci, _all_qci)
# Scause, performs summation over all causes for m[name_cause]. # Scause, performs summation over all causes for m[name_cause].
...@@ -721,11 +721,11 @@ def Sqci(m: Measurement, name_qci: str): ...@@ -721,11 +721,11 @@ def Sqci(m: Measurement, name_qci: str):
# Scause(m, 'RRC.ConnEstabSucc.CAUSE') # Scause(m, 'RRC.ConnEstabSucc.CAUSE')
# #
# name_cause must have '.CAUSE' suffix. # name_cause must have '.CAUSE' suffix.
def Scause(m: Measurement, name_cause: str): def Scause(m, name_cause):
return _Sx(m, name_cause, _all_cause) return _Sx(m, name_cause, _all_cause)
# _Sx serves Sqci and Scause. # _Sx serves Sqci and Scause.
def _Sx(m: Measurement, name_x: str, _all_x: func): def _Sx(m, name_x, _all_x):
name_sum, name_xv = _all_x(name_x) name_sum, name_xv = _all_x(name_x)
s = m[name_sum] s = m[name_sum]
if not isNA(s): if not isNA(s):
...@@ -747,7 +747,7 @@ def _Sx(m: Measurement, name_x: str, _all_x: func): ...@@ -747,7 +747,7 @@ def _Sx(m: Measurement, name_x: str, _all_x: func):
# _i2pc maps Interval in [0,1] to one in [0,100] by multiplying lo/hi by 1e2. # _i2pc maps Interval in [0,1] to one in [0,100] by multiplying lo/hi by 1e2.
def _i2pc(x: Interval): # -> Interval def _i2pc(x): # -> Interval
return Interval(x['lo']*100, x['hi']*100) return Interval(x['lo']*100, x['hi']*100)
......
...@@ -432,7 +432,7 @@ def test_Calc_erab_accessibility(): ...@@ -432,7 +432,7 @@ def test_Calc_erab_accessibility():
calc = Calc(mlog, 10,20) calc = Calc(mlog, 10,20)
# _ asserts that provided interval is precise and equals vok. # _ asserts that provided interval is precise and equals vok.
def _(i: Interval, vok): def _(i, vok):
assert i['lo'] == i['hi'] assert i['lo'] == i['hi']
assert i['lo'] == vok assert i['lo'] == vok
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment