...
 
Commits (8)
  • Kirill Smelkov's avatar
    Pre-Alpha -> Alpha · bda7ab21
    Kirill Smelkov authored
    XLTE should be ready to be tried to be used for real now.
    bda7ab21
  • Kirill Smelkov's avatar
    kpi: Add way to compute aggregated counters + showcase this · bf96c767
    Kirill Smelkov authored
    - add Calc.cum to aggregate Measurements.
    
    - add ΣMeasurement type to represent result of this. It is very similar
      to Measurement, but every field comes accompanied with information
      about how much time there was no data for that field. In other words
      it is not all or nothing for NA in the result. For example a field
      might be present 90% of the time and NA only 10% of the time. We want to
      preserver knowledge about that 90% of valid values in the result. And we
      also want to know how much time there was no data.
    
    - amend kpidemo.py and kpidemo.ipynb to demonstrate this.
    bf96c767
  • Kirill Smelkov's avatar
    fixup! amari.xlog: Implement log rotation · ce383492
    Kirill Smelkov authored
    Adjust plain writer to append to log file instead of truncating it on
    the open. The rotating writers are already ok as they use "a" mode by default.
    ce383492
  • Kirill Smelkov's avatar
    amari.xlog: Test and polish str(LogSpec) · 70b4b71c
    Kirill Smelkov authored
    If the parsed period was '60s' we were printing it back as '60.0s' on str.
    Fix it by using %g insted of %s.
    70b4b71c
  • Kirill Smelkov's avatar
    amari.xlog: Add support for arbitrary query options · bcfd82dd
    Kirill Smelkov authored
    Before this patch we were supporting only boolean option flags - with, for
    example, stats[rf] meaning stats query with {"rf": True} arguments. Now we add
    support for arbitrary types, so that it is possible to specify e.g. integer or
    string query options, as well as some boolean flag set to false.
    
    This should be good for generality.
    
    For backward compatibility the old way to implicitly specify "on" flags is
    continued to be supported.
    bcfd82dd
  • Kirill Smelkov's avatar
    earfcn: New package to do computations with LTE bands, frequencies and EARFCN numbers · 6cb9d37f
    Kirill Smelkov authored
    Do a package which provides calculations like EARFCN -> frequency,
    EARFCN -> band info, and to convert DL/UL EARFCN in between each other.
    
    I was hoping to find something ready on the net, but could find only
    pypi.org/project/nrarfcn for 5G, while for LTE everything I found
    was of lesser quality and capability.
    
    -> So do it myself.
    
    See package documentation for API details.
    6cb9d37f
  • Kirill Smelkov's avatar
    nrarfcn: New package to do computations with NR bands, frequencies and NR-ARFCN numbers. · b8065120
    Kirill Smelkov authored
    Do a package for converting DL/UL NR-ARFCN in between each other and to
    convert DL NR-ARFCN to SSB NR-ARFCN. The API mimics xlte.earfcn added in 6cb9d37f.
    
    xlte.nrarfcn complements pypi.org/project/nrarfcn, which we use here under the hood.
    
    See package documentation for API details.
    b8065120
  • Kirill Smelkov's avatar
    nrarfcn: Fix behaviour on invalid input parameters · 8e606c64
    Kirill Smelkov authored
    Contrary to earfcn, where band can be automatically deduced from earfcn
    number because 4G bands never overlap, most functions in nrarfcn accept
    as input parameters both nr_arfcn and band, because 5G bands can and do
    overlap. As the result it is possible to invoke e.g. dl2ul with
    dl_nr_arfcn being outside of downlink spectrum of specified band.
    
    However in b8065120 I've made a thinko and handled such situation with
    simple assert which does not lead to useful error feedback from a user
    perspective, for example:
    
        In [2]: xnrarfcn.dl2ul(10000, 1)
        ---------------------------------------------------------------------------
        AssertionError                            Traceback (most recent call last)
        Cell In[2], line 1
        ----> 1 n.dl2ul(10000, 1)
    
        File ~/src/wendelin/xlte/nrarfcn.py:85, in dl2ul(dl_nr_arfcn, band)
             83 if dl_lo == 'N/A':
             84     raise AssertionError('band%r does not have downlink spectrum' % band)
        ---> 85 assert dl_lo <= dl_nr_arfcn <= dl_hi
             86 ul_lo, ul_hi = nr.get_nrarfcn_range(band, 'ul')
             87 if ul_lo == 'N/A':
    
        AssertionError:
    
    The issue here is that asserts can be used to only verify internal
    invariants, and that reported error does not provide details about which
    nrarfcn and band were used in the query.
    
    -> Fix this by providing details in the error reported to incorrect
    module usage, and by consistently raising ValueError for "invalid
    parameters" cases.
    
    The reported error for above example now becomes
    
        ValueError: band1: NR-ARFCN=10000 is outside of downlink spectrum
    8e606c64
......@@ -4,6 +4,8 @@
XLTE repository provides assorted tools and packages with functionality related to LTE:
- `earfcn` - do computations with LTE bands, frequencies and EARFCN numbers.
- `nrarfcn` - do computations with NR bands, frequencies and NR-ARFCN numbers.
- `kpi` - process measurements and compute KPIs from them.
- `amari.drb` - infrastructure to process flows on data radio bearers.
- `amari.kpi` - driver for Amarisoft LTE stack to retrieve KPI-related measurements from logs.
......
......@@ -85,18 +85,24 @@ log = logging.getLogger('xlte.amari.xlog')
# For example stats[rf]/10s.
class LogSpec:
# .query e.g. 'stats'
# .optv [] with flags to send with query
# .opts {} opt -> value to send with query
# .period how often to issue the query (seconds)
DEFAULT_PERIOD = 60
def __init__(spec, query, optv, period):
def __init__(spec, query, opts, period):
spec.query = query
spec.optv = optv
spec.opts = opts
spec.period = period
def __str__(spec):
return "%s[%s]/%ss" % (spec.query, ','.join(spec.optv), spec.period)
optv = []
for opt, val in spec.opts.items():
if val is True:
optv.append(opt)
else:
optv.append('%s=%s' % (opt, json.dumps(val)))
return "%s[%s]/%gs" % (spec.query, ','.join(optv), spec.period)
# LogSpec.parse parses text into LogSpec.
@staticmethod
......@@ -104,7 +110,7 @@ class LogSpec:
def bad(reason):
raise ValueError("invalid logspec %s: %s" % (qq(text), reason))
optv = []
opts = {}
period = LogSpec.DEFAULT_PERIOD
query = text
_ = query.rfind('/')
......@@ -126,13 +132,19 @@ class LogSpec:
if _ == -1:
bad("missing closing ]")
optv = tail[1:_].split(',')
for opt in optv:
val = True
if '=' in opt:
opt, val = opt.split('=', 1)
val = json.loads(val)
opts[opt] = val
tail = tail[_+1:]
for c in '[]/ ':
if c in query:
bad("invalid query")
return LogSpec(query, optv, period)
return LogSpec(query, opts, period)
# IWriter represents output to where xlog writes its data.
......@@ -166,10 +178,10 @@ def xlog(ctx, wsuri, w: IWriter, logspecv):
logspecv = logspecv[:] # keep caller's intact
if lsync is None:
isync = 0
lsync = LogSpec("meta.sync", [], pmax*10)
lsync = LogSpec("meta.sync", {}, pmax*10)
logspecv.insert(0, lsync)
if lconfig_get is None:
logspecv.insert(isync+1, LogSpec("config_get", [], lsync.period))
logspecv.insert(isync+1, LogSpec("config_get", {}, lsync.period))
# verify that sync will come at least every LOS_window records
ns = 0
......@@ -354,10 +366,6 @@ class _XLogger:
logspec = xl.logspecv[imin]
tnextv[imin] += logspec.period
opts = {}
for opt in logspec.optv:
opts[opt] = True
# issue queries with planned schedule
# TODO detect time overruns and correct schedule correspondingly
tnow = time.now()
......@@ -382,7 +390,7 @@ class _XLogger:
xl.jemit_sync("attached", "periodic", isync)
else:
t_rx, resp, resp_raw = req_(ctx, logspec.query, opts)
t_rx, resp, resp_raw = req_(ctx, logspec.query, logspec.opts)
srv_time = resp["time"]
srv_utc = resp.get("utc")
xl.emit(resp_raw)
......@@ -494,7 +502,7 @@ def _openwriter(path: str, rotatespec) -> IWriter:
# _PlainWriter implements writer that emits data to plain file without rotation.
class _PlainWriter(IWriter):
def __init__(w, path):
w.f = open(path, "w")
w.f = open(path, "a")
def writeline(w, line: str):
w.f.write(line+'\n')
......
......@@ -300,12 +300,13 @@ def test_Reader_timestamp_from_sync_wo_utc():
def test_LogSpec():
logspec = "stats[samples,rf]/60s"
logspec = 'stats[samples,rf,abc=123,def="hello world"]/60s'
spec = xlog.LogSpec.parse(logspec)
assert spec.query == "stats"
assert spec.optv == ["samples", "rf"]
assert spec.opts == {"samples": True, "rf": True, "abc": 123, "def": "hello world"}
assert spec.period == 60.0
assert str(spec) == logspec
def test_ReverseLineReader():
......
This source diff could not be displayed because it is too large. You can view the blob instead.
#!/usr/bin/env python
"""kpidemo - plot KPIs computed from enb.xlog
Also print total for raw counters.
Usage: kpidemo <time period> <enb.xlog uri>
"""
......@@ -127,7 +129,15 @@ def main():
facc, fthp = fig.subfigures(1, 2)
figplot_erab_accessibility (facc, vτ, vInititialEPSBEstabSR, vAddedEPSBEstabSR, tperiod)
figplot_eutran_ip_throughput(fthp, vτ, vIPThp_qci, tperiod)
plt.show()
defer(plt.show)
# Step 5. Print total for raw counters.
mhead = mlog.data()[0]
mtail = mlog.data()[-1]
calc_total = kpi.Calc(mlog, mhead['X.Tstart'], mtail['X.Tstart']+mtail['X.δT'])
Σ = calc_total.sum()
print_ΣMeasurement(Σ)
# ---- plotting routines ----
......@@ -238,5 +248,43 @@ def vτ_period_pretty(vτ):
return "%s ±%s [%s, %s]" % (tpretty(avg), tpretty(std), tpretty(min), tpretty(max))
# ---- printing routines ----
# print_ΣMeasurement prints aggregated counters.
def print_ΣMeasurement(Σ: kpi.ΣMeasurement):
print("Time:\t%s - %s" % (datetime.fromtimestamp(Σ['X.Tstart']),
datetime.fromtimestamp(Σ['X.Tstart'] + Σ['X.δT'])))
# emit1 prints one field.
def emit1(name, v, τ_na):
fmt = "%12s "
if kpi.isNA(v):
s = fmt % "NA"
else:
if isinstance(v, np.floating):
fmt = "%15.2f"
s = fmt % v
pna = τ_na / Σ['X.δT'] * 100
if pna >= 0.01:
s += " (%.2f%% NA)" % pna
print("%-32s:\t%s" % (name, s))
for field in Σ._dtype0.names:
if field in ('X.Tstart', 'X.δT'):
continue
v = Σ[field]['value']
τ_na = Σ[field]['τ_na']
if v.shape == (): # scalar
emit1(field, v, τ_na)
else:
assert len(v.shape) == 1
if kpi.isNA(v).all(): # subarray full of ø
emit1(field, v[0], τ_na[0])
else: # subarray with some non-ø data
for k in range(v.shape[0]):
if v[k] != 0:
fieldk = '%s.%d' % (field[:field.rfind('.')], k) # name.QCI -> name.k
emit1(fieldk, v[k], τ_na[k])
if __name__ == '__main__':
main()
This diff is collapsed.
# Copyright (C) 2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from xlte.earfcn import frequency, dl2ul, ul2dl, _band_tab
from xlte import earfcn
from pytest import raises
# verify earfcn-related calculations wrt several points.
def test_earfcn():
def _(band, dl_earfcn, ul_earfcn, fdl, ful, rf_mode, fdl_lo, fdl_hi_, ful_lo, ful_hi_):
assert frequency(dl_earfcn) == fdl
if ul_earfcn is not None:
assert dl2ul(dl_earfcn) == ul_earfcn
else:
assert rf_mode in ('FDD', 'SDL')
if rf_mode == 'FDD':
estr = 'does not have enough uplink spectrum'
if rf_mode == 'SDL':
estr = 'does not have uplink spectrum'
with raises(KeyError, match=estr):
dl2ul(dl_earfcn)
b, isdl = earfcn.band(dl_earfcn)
assert isdl == True
if ul_earfcn is not None:
assert frequency(ul_earfcn) == ful
assert ul2dl(ul_earfcn) == dl_earfcn
b_, isdl_ = earfcn.band(ul_earfcn)
assert isdl_ == (rf_mode == 'TDD')
assert b == b_
assert b.band == band
assert b.rf_mode == rf_mode
assert b.fdl_lo == fdl_lo
assert b.fdl_hi_ == fdl_hi_
assert b.ful_lo == ful_lo
assert b.ful_hi_ == ful_hi_
# band dl ul fdl ful rf_mode fdl_lo fdl_hi_ ful_lo ful_hi_
_( 1, 300, 18300, 2140, 1950, 'FDD', 2110, 2170, 1920, 1980)
_(37, 37555, 37555, 1910.5, 1910.5, 'TDD', 1910, 1930, 1910, 1930)
_(29, 9700, None, 721, None, 'SDL', 717, 728, None, None)
_(66, 67135, 132671, 2179.9, 1779.9, 'FDD', 2110, 2200, 1710, 1780)
_(66, 67136, None, 2180, None, 'FDD', 2110, 2200, 1710, 1780) # NOTE B66 has different amount in dl and ul ranges
# verify that earfcn regions of all bands do not overlap.
def test_bands_no_earfcn_overlap():
rv = [] # of (nlo, nhi)
for b in _band_tab:
assert b.ndl_lo is not None
assert b.ndl_hi is not None
rv.append((b.ndl_lo, b.ndl_hi))
if b.rf_mode not in ('TDD', 'SDL'):
assert b.nul_lo is not None
assert b.nul_hi is not None
rv.append((b.nul_lo, b.nul_hi))
for i in range(len(rv)):
ilo, ihi = rv[i]
assert ilo < ihi
for j in range(len(rv)):
if j == i:
continue
jlo, jhi = rv[j]
assert jlo < jhi
if not ((ihi < jlo) or (jhi < ilo)):
assert False, "(%r, %r) overlaps with (%r, %r)" % (ilo, ihi, jlo, jhi)
......@@ -21,7 +21,8 @@
- Calc is KPI calculator. It can be instantiated on MeasurementLog and time
interval over which to perform computations. Use Calc methods such as
.erab_accessibility() and .eutran_ip_throughput() to compute KPIs.
.erab_accessibility() and .eutran_ip_throughput() to compute KPIs, and .sum()
to compute aggregated measurements.
- MeasurementLog maintains journal with result of measurements. Use .append()
to populate it with data.
......@@ -70,8 +71,9 @@ from golang import func
# ──────|─────|────[────|────)──────|──────|────────>
# ←─ τ_lo τ_hi ──→ time
#
# It is also possible to merely aggregate measured values via .sum() .
#
# See also: MeasurementLog, Measurement.
# See also: MeasurementLog, Measurement, ΣMeasurement.
class Calc:
# ._data []Measurement - fully inside [.τ_lo, .τ_hi)
# [.τ_lo, .τ_hi) time interval to compute over. Potentially wider than originally requested.
......@@ -217,6 +219,27 @@ class Interval(np.void):
])
# ΣMeasurement represents result of aggregation of several Measurements.
#
# It is similar to Measurement, but each value comes accompanied with
# information about how much time there was no data for that field:
#
# Σ[f].value = Σ Mi[f] if Mi[f] ≠ NA
# i
#
# Σ[f].τ_na = Σ Mi[X.δT] if Mi[f] = NA
# i
class ΣMeasurement(np.void):
_ = []
for name in Measurement._dtype.names:
typ = Measurement._dtype.fields[name][0].type
if not name.startswith('X.'): # X.Tstart, X.δT
typ = np.dtype([('value', typ), ('τ_na', Measurement.Ttime)])
_.append((name, typ))
_dtype = np.dtype(_)
del _
# ----------------------------------------
# Measurement is the central part around which everything is organized.
# Let's have it go first.
......@@ -233,6 +256,23 @@ def __new__(cls):
m[field][:] = NA(fdtype.base) # subarray
return m
# ΣMeasurement() creates new ΣMeasurement instance.
#
# For all fields .value is initialized with NA and .τ_na with 0.
@func(ΣMeasurement)
def __new__(cls):
Σ = _newscalar(cls, cls._dtype)
for field in Σ.dtype.names:
fdtype = Σ.dtype.fields[field][0]
if fdtype.shape != (): # skip subarrays - rely on aliases
continue
if field.startswith('X.'): # X.Tstart, X.δT
Σ[field] = NA(fdtype)
else:
Σ[field]['value'] = NA(fdtype.fields['value'][0])
Σ[field]['τ_na'] = 0
return Σ
# _all_qci expands <name>.QCI into <name>.sum and [] of <name>.<qci> for all possible qci values.
# TODO remove and use direct array access (after causes are expanded into array too)
......@@ -251,26 +291,26 @@ def _all_cause(name_cause: str): # -> name_sum, ()name_causev
name = name_cause[:-len(".CAUSE")]
return name+".sum", () # TODO add all possible CAUSEes - TS 36.331 (RRC)
# expand all .QCI and .CAUSE in Measurement._dtype .
def _():
# expand all .QCI and .CAUSE in ._dtype of Measurement and ΣMeasurement.
def _(Klass):
# expand X.QCI -> X.sum + X.QCI[nqci]
qnamev = [] # X from X.QCI
expv = [] # of (name, typ[, shape])
for name in Measurement._dtype .names:
typ = Measurement._dtype .fields[name][0].type
for name in Klass._dtype .names:
dtyp = Klass._dtype .fields[name][0]
if name.endswith('.QCI'):
_ = name[:-len('.QCI')]
qnamev.append(_)
expv.append(('%s.sum' % _, typ)) # X.sum
expv.append((name, typ, nqci)) # X.QCI[nqci]
expv.append(('%s.sum' % _, dtyp)) # X.sum
expv.append((name, dtyp, nqci)) # X.QCI[nqci]
elif name.endswith('.CAUSE'):
Σ, causev = _all_cause(name)
for _ in (Σ,)+causev:
expv.append((_, typ))
expv.append((_, dtyp))
else:
expv.append((name, typ))
expv.append((name, dtyp))
_dtype = np.dtype(expv)
......@@ -292,14 +332,15 @@ def _():
formatv.append(qarr.base)
offsetv.append(off0 + qci*qarr.base.itemsize)
Measurement._dtype0 = _dtype # ._dtype without aliases
Measurement._dtype = np.dtype({
Klass._dtype0 = _dtype # ._dtype without aliases
Klass._dtype = np.dtype({
'names': namev,
'formats': formatv,
'offsets': offsetv,
})
assert Measurement._dtype.itemsize == Measurement._dtype0.itemsize
_()
assert Klass._dtype.itemsize == Klass._dtype0.itemsize
_(Measurement)
_(ΣMeasurement)
del _
......@@ -657,6 +698,33 @@ def eutran_ip_throughput(calc): # -> IPThp[QCI][dl,ul]
return thp
# sum aggregates values of all Measurements in covered time interval.
# TODO tests
@func(Calc)
def sum(calc): # -> ΣMeasurement
Σ = ΣMeasurement()
Σ['X.Tstart'] = calc.τ_lo
Σ['X.δT'] = calc.τ_hi - calc.τ_lo
for m in calc._miter():
for field in m.dtype.names:
if field.startswith('X.'): # X.Tstart, X.δT
continue
v = m[field]
if v.shape != (): # skip subarrays - rely on aliases
continue
if isNA(v):
Σ[field]['τ_na'] += m['X.δT']
else:
if isNA(Σ[field]['value']):
Σ[field]['value'] = 0
Σ[field]['value'] += v
return Σ
# _miter iterates through [.τ_lo, .τ_hi) yielding Measurements.
#
# The measurements are yielded with consecutive timestamps. There is no gaps
......
# Copyright (C) 2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Package xlte.nrarfcn helps to do computations with NR bands, frequencies and NR-ARFCN numbers.
It complements pypi.org/project/nrarfcn and provides the following additional utilities:
- frequency converts NR-ARFCN to frequency.
- dl2ul and ul2dl convert between DL NR-ARFCN and UL NR-ARFCN corresponding to
each other in particular band.
- dl2ssb returns SSB NR-ARFCN that is located nearby DL NR-ARFCN on Global Synchronization Raster.
See also package xlte.earfcn which provides similar functionality for 4G.
"""
# import pypi.org/project/nrarfcn with avoiding name collision with xlte.nrarfcn even if xlte is installed in editable mode.
def _():
modname = 'nrarfcn'
import sys, importlib.util
import xlte
# if already imported - we are done.
# but if previously ran `import nrarfcn` resolved to xlte/nrarfcn.py due to
# the way how easy_install handles xlte editable install with adding xlte
# onto sys.path, undo that.
mod = sys.modules.get(modname)
if mod is not None:
if mod.__spec__.origin == __spec__.origin:
del sys.modules[modname]
mod = None
if mod is not None:
return mod
# import nrarfcn with ignoring xlte.nrarfcn spec
# based on https://docs.python.org/3/library/importlib.html#approximating-importlib-import-module
# we also ignore cwd/xlte, if automatically injected to sys.path[0] by python and pytest, so that running things in xlte/ also work
pathsave = {} # idx -> sys.path[idx]
for p in [''] + xlte.__path__:
try:
i = sys.path.index(p)
except ValueError:
pass
else:
pathsave[i] = p
for i in sorted(pathsave, reverse=True):
sys.path.pop(i)
try:
for finder in sys.meta_path:
spec = finder.find_spec(modname, None)
if spec is not None and spec.origin != __spec__.origin:
break
else:
raise ModuleNotFoundError('Module %r not found' % modname)
finally:
for i in sorted(pathsave):
sys.path.insert(i, pathsave[i])
mod = importlib.util.module_from_spec(spec)
assert modname not in sys.modules
sys.modules[modname] = mod
spec.loader.exec_module(mod)
return mod
nr = _()
# dl2ul returns UL NR-ARFCN that corresponds to DL NR-ARFCN and band.
def dl2ul(dl_nr_arfcn, band): # -> ul_nr_arfcn
dl_lo, dl_hi = nr.get_nrarfcn_range(band, 'dl')
if dl_lo == 'N/A':
raise ValueError('band%r does not have downlink spectrum' % band)
if not (dl_lo <= dl_nr_arfcn <= dl_hi):
raise ValueError('band%r: NR-ARFCN=%r is outside of downlink spectrum' % (band, dl_nr_arfcn))
ul_lo, ul_hi = nr.get_nrarfcn_range(band, 'ul')
if ul_lo == 'N/A':
raise KeyError('band%r, to which DL NR-ARFCN=%r belongs, does not have uplink spectrum' % (band, dl_nr_arfcn))
if dl_nr_arfcn - dl_lo > ul_hi - ul_lo:
raise KeyError('band%r does not have enough uplink spectrum to provide pair for NR-ARFCN=%r' % (band, dl_nr_arfcn))
ul_nr_arfcn = ul_lo + (dl_nr_arfcn - dl_lo)
assert ul_lo <= ul_nr_arfcn <= ul_hi
return ul_nr_arfcn
# ul2dl returns DL NR-ARFCN that corresponds to UL NR-ARFCN and band.
def ul2dl(ul_nr_arfcn, band): # -> dl_nr_arfcn
ul_lo, ul_hi = nr.get_nrarfcn_range(band, 'ul')
if ul_lo == 'N/A':
raise ValueError('band%r does not have uplink spectrum' % band)
if not (ul_lo <= ul_nr_arfcn <= ul_hi):
raise ValueError('band%r: NR-ARFCN=%r is outside of uplink spectrum' % (band, ul_nr_arfcn))
dl_lo, dl_hi = nr.get_nrarfcn_range(band, 'dl')
if dl_lo == 'N/A':
raise KeyError('band%r, to which UL NR-ARFCN=%r belongs, does not have downlink spectrum' % (band, ul_nr_arfcn))
if ul_nr_arfcn - ul_lo > dl_hi - dl_lo:
raise KeyError('band%r does not have enough downlink spectrum to provide pair for NR-ARFCN=%r' % (band, ul_nr_arfcn))
dl_nr_arfcn = dl_lo + (ul_nr_arfcn - ul_lo)
assert dl_lo <= dl_nr_arfcn <= dl_hi
return dl_nr_arfcn
# dl2ssb returns SSB NR-ARFCN that is located nearby DL NR-ARFCN on Global Synchronization Raster.
#
# input Fdl should be aligned with ΔFraster.
# for return (Fdl - Fssb) is aligned with some SSB SubCarrier Spacing of given band.
# max_ssb_scs_khz indicates max SSB SubCarrier Spacing for which it was possible to find Fssb constrained with above alignment requirement.
#
# KeyError is raised if Fssb is not possible to find for given Fdl and band.
# ValueError is raised if input parameters are incorrect.
def dl2ssb(dl_nr_arfcn, band): # -> ssb_nr_arfcn, max_ssb_scs_khz
_trace('\ndl2ssb %r %r' % (dl_nr_arfcn, band))
dl_lo, dl_hi = nr.get_nrarfcn_range(band, 'dl')
if dl_lo == 'N/A':
raise ValueError('band%r does not have downlink spectrum' % band)
if not (dl_lo <= dl_nr_arfcn <= dl_hi):
raise ValueError('band%r: NR-ARFCN=%r is outside of downlink spectrum' % (band, dl_nr_arfcn))
f = frequency(nrarfcn=dl_nr_arfcn)
_trace('f %.16g' % f)
# query all SSB SCS available in this band
if isinstance(band, int):
band = 'n%d' % band
tab_fr1 = nr.tables.applicable_ss_raster_fr1.table_applicable_ss_raster_fr1()
tab_fr2 = nr.tables.applicable_ss_raster_fr2.table_applicable_ss_raster_fr2()
scs_v = []
for tab in (tab_fr1, tab_fr2):
for row in tab.data:
if tab.get_cell(row, 'band') == band:
scs_v.append( tab.get_cell(row, 'scs') )
# for each scs↓ try to find suitable sync point
for scs_khz in sorted(scs_v, reverse=True):
_trace('trying scs %r' % scs_khz)
scs = scs_khz / 1000 # khz -> mhz
# locate nearby point on global sync raster and further search around it
# until sync point aligns to be multiple of scs
gscn = nr.get_gscn_by_frequency(f)
while 1:
f_sync = nr.get_frequency_by_gscn(gscn)
f_sync_arfcn = nr.get_nrarfcn(f_sync)
if not (dl_lo <= f_sync_arfcn <= dl_hi):
break
# check `(f_sync - f) % scs == 0` with tolerating fp rounding
δf = f_sync - f
q, r = divmod(δf, scs)
r_scs = r / scs
_trace('gscn %d\tf_sync %.16g (%d) δf %+.3f //scs %d %%scs %.16g·scs' % (gscn, f_sync, nr.get_nrarfcn(f_sync), δf, q, r_scs))
if abs(r_scs - round(r_scs)) < 1e-5:
_trace('-> %d %d' % (f_sync_arfcn, scs_khz))
return f_sync_arfcn, scs_khz
gscn += (+1 if δf > 0 else -1)
raise KeyError('dl2ssb %r %s: cannot find SSB frequency that is both on GSR and aligns from dl modulo SSB SCS of the given band' % (dl_nr_arfcn, band))
# frequency returns frequency corresponding to DL or UL NR-ARFCN.
def frequency(nrarfcn): # -> freq (MHz)
return nr.get_frequency(nrarfcn)
_debug = False
def _trace(*argv):
if _debug:
print(*argv)
# Copyright (C) 2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from xlte.nrarfcn import frequency, dl2ul, ul2dl, dl2ssb
from xlte.nrarfcn import nr
from pytest import raises
# verify nrarfcn-related calculations wrt several points.
def test_nrarfcn():
def _(band, dl_nr_arfcn, ul_nr_arfcn, fdl, ful, rf_mode, ssb_nr_arfcn, max_ssb_scs_khz):
assert rf_mode == nr.get_duplex_mode(band).upper()
if dl_nr_arfcn is not None:
assert frequency(dl_nr_arfcn) == fdl
if ul_nr_arfcn is not None:
assert dl2ul(dl_nr_arfcn, band) == ul_nr_arfcn
else:
assert rf_mode in ('FDD', 'SDL')
if rf_mode == 'FDD':
estr = 'does not have enough uplink spectrum'
if rf_mode == 'SDL':
estr = 'does not have uplink spectrum'
with raises(KeyError, match=estr):
dl2ul(dl_nr_arfcn, band)
if ul_nr_arfcn is not None:
assert frequency(ul_nr_arfcn) == ful
if dl_nr_arfcn is not None:
assert ul2dl(ul_nr_arfcn, band) == dl_nr_arfcn
else:
assert rf_mode in ('FDD', 'SUL')
if rf_mode == 'FDD':
estr = 'does not have enough downlink spectrum'
if rf_mode == 'SUL':
estr = 'does not have downlink spectrum'
with raises(KeyError, match=estr):
ul2dl(ul_nr_arfcn, band)
if dl_nr_arfcn is not None:
if not isinstance(ssb_nr_arfcn, type):
assert dl2ssb(dl_nr_arfcn, band) == (ssb_nr_arfcn, max_ssb_scs_khz)
else:
with raises(ssb_nr_arfcn):
dl2ssb(dl_nr_arfcn, band)
# band dl ul fdl ful rf_mode ssb max_ssb_scs_khz
_( 1, 428000, 390000, 2140, 1950, 'FDD', 427970, 15)
_( 2, 396000, 380000, 1980, 1900, 'FDD', 396030, 15)
_( 5, 176300, 167300, 881.5, 836.5, 'FDD', 176210, 30)
_( 5, 176320, 167320, 881.6, 836.6, 'FDD', 176410, 30)
_( 7, 526000, 502000, 2630, 2510, 'FDD', 526090, 15)
_( 29, 144500, None, 722.5, None, 'SDL', 144530, 15)
_( 39, 378000, 378000, 1890, 1890, 'TDD', 378030, 30) # % 30khz = 0
_( 39, 378003, 378003, 1890.015, 1890.015, 'TDD', 378030, 15) # % 15khz = 0 % 30khz ≠ 0
_( 38, 520000, 520000, 2600, 2600, 'TDD', 520090, 30)
_( 41, 523020, 523020, 2615.1, 2615.1, 'TDD', 522990, 30) # % 30khz = 0
_( 41, 523023, 523023, 2615.115, 2615.115, 'TDD', 522990, 15) # % 15khz = 0 % 30khz ≠ 0
_( 66, 431000, 351000, 2155, 1755, 'FDD', 431090, 30)
_( 66, 437000, None, 2185, None, 'FDD', 437090, 30) # NOTE in n66 range(dl) > range(ul)
_( 78, 632628, 632628, 3489.42, 3489.42, 'TDD', 632640, 30)
_( 91, 285900, 166900, 1429.5, 834.5, 'FDD', 285870, 15)
_( 91, None, 172400, None, 862, 'FDD', None, None) # NOTE in n91 range(dl) < range(ul)
_( 80, None, 342000, None, 1710, 'SUL', None, None)
_(257, 2079167, 2079167, 28000.08, 28000.08, 'TDD', 2079163, 240) # FR2-1
_(257, 2079169, 2079169, 28000.20, 28000.20, 'TDD', 2079163, 120) # FR2-1 % 240khz ≠ 0
_(263, 2680027, 2680027, 64051.68, 64051.68, 'TDD', 2679931, 960) # FR2-2
_(263, 2680003, 2680003, 64050.24, 64050.24, 'TDD', 2679931, 480) # FR2-2 % 960khz ≠ 0
_(263, 2679991, 2679991, 64049.52, 64049.52, 'TDD', 2679931, 120) # FR2-2 % 480khz ≠ 0
# some dl points not on ΔFraster -> ssb cannot be found
_( 78, 632629, 632629, 3489.435, 3489.435, 'TDD', KeyError, None)
_(257, 2079168, 2079168, 28000.14, 28000.14, 'TDD', KeyError, None)
# error in input parameters -> ValueError
def edl(band, dl_nr_arfcn, estr):
for f in (dl2ul, dl2ssb):
with raises(ValueError, match=estr):
f(dl_nr_arfcn, band)
def eul(band, ul_nr_arfcn, estr):
for f in (ul2dl,):
with raises(ValueError, match=estr):
f(ul_nr_arfcn, band)
# no x spectrum when requesting x2y
edl(80, 10000, 'band80 does not have downlink spectrum') # SUL
eul(29, 10000, 'band29 does not have uplink spectrum') # SDL
# mismatch between x_nr_arfcn and band
edl( 1, 10000, 'band1: NR-ARFCN=10000 is outside of downlink spectrum')
eul( 1, 10000, 'band1: NR-ARFCN=10000 is outside of uplink spectrum')
......@@ -90,6 +90,7 @@ setup(
'websocket-client',
'pygolang',
'numpy',
'nrarfcn',
],
extras_require = {
......@@ -105,7 +106,7 @@ setup(
},
classifiers = [_.strip() for _ in """\
Development Status :: 2 - Pre-Alpha
Development Status :: 2 - Alpha
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3.9
......