Commit 9cd06cb9 authored by Kirill Smelkov's avatar Kirill Smelkov

amari.drb: Add _CTXBytesSplitter to split total tx_bytes into per-cell parts

To be able to compute E-UTRAN IP Throughput amari.drb uses Sampler which
detects and extracts separate transmission samples from 100Hz log of
ue_get[stats]. The Sampler, in turn uses help from _BitSync for correct
operations because Amarisoft LTEENB updates counters for dl_total_bytes
and dl_tx at different times, and _BitSync synchronizes streams of those
updates in time. _BitSync itself works by correlating amount of
transmitted data (tx_bytes) and transport blocks (#tx) and shifting some
amount of #tx to previous frame based on the correlation. See d102ffaa
(amari.drb: Start of the package) for details.

This works ok for configurations with 1 cell, but does not work out of the box
for multicell configurations because #tx is per-cell value, while e.g.
dl_total_bytes is reported by LTEENB only as a value aggregated over all cells.
That's why original implementation in d102ffaa had an assert that the
number of cells an UE is associated with is 1.

-> Implement custom filter that splits overall tx_bytes into per-cell
   parts via heuristic based on reported per-cell bitrates to workaround
   that: the more cell bitrate is the more is the part of tx_bytes that
   gets associated to this cell.

In basic implementation the heuristic would be to divide tx_bytes as

	tx_bytes(cell) = tx_bytes·β/Σcells(β)		; β is bitrate of a cell

but given that for every frame _BitSync works by computing things based
on neighbour frame as well we do it as

	tx_bytes(cell) = tx_bytes·(β₁+β₂)/Σcells(β₁+β₂)

This should make the heuristic a bit more stable.

This patch comes with tx_bytes splitter filter itself only. In the next
patch we will use _CTXBytesSplitter to implement multicell-awareness for
_BitSync.
parent 91967123
...@@ -152,6 +152,15 @@ class _BitSync: ...@@ -152,6 +152,15 @@ class _BitSync:
'i_lshift', # next left shift will be done on txv[i_lshift] <- txv[i_lshift+1] 'i_lshift', # next left shift will be done on txv[i_lshift] <- txv[i_lshift+1]
) )
# _CTXBytesSplitter will serve _BitSync by spliting total tx_bytes into per-cell parts.
#
# .next(δt, tx_bytes, {C → #tx, bitrate}) -> [](δt', {C → #tx, ctx_bytes})
# .finish() -> [](δt', {C → #tx, ctx_bytes})
class _CTXBytesSplitter:
__slots__ = (
'txq', # [](δt, tx_bytes, _Utx)
)
# Sampler() creates new sampler that will start sampling from ue_stats0/stats0 state. # Sampler() creates new sampler that will start sampling from ue_stats0/stats0 state.
@func(Sampler) @func(Sampler)
...@@ -234,6 +243,9 @@ class _UCtx: # UE transmission state on particular cell ...@@ -234,6 +243,9 @@ class _UCtx: # UE transmission state on particular cell
'bitrate', 'bitrate',
'rank', 'rank',
'xl_use_avg', 'xl_use_avg',
# tx_bytes is per-cell part of total tx_bytes estimated by _CTXBytesSplitter
'tx_bytes', # initially set to None
) )
@func(_Sampler) @func(_Sampler)
...@@ -276,6 +288,8 @@ def add(s, ue_stats, stats, init=False): ...@@ -276,6 +288,8 @@ def add(s, ue_stats, stats, init=False):
uc.rank = cell['ri'] if s.use_ri else 1 uc.rank = cell['ri'] if s.use_ri else 1
uc.xl_use_avg = scell['%s_use_avg' % s.dir] uc.xl_use_avg = scell['%s_use_avg' % s.dir]
uc.tx_bytes = None
ue = s.ues.get(ue_id) ue = s.ues.get(ue_id)
if ue is None: if ue is None:
ue = s.ues[ue_id] = _UE(s.use_bitsync) ue = s.ues[ue_id] = _UE(s.use_bitsync)
...@@ -650,6 +664,71 @@ def _rebalance(s, l): ...@@ -650,6 +664,71 @@ def _rebalance(s, l):
#print(' < rebalance', s.txq[:l]) #print(' < rebalance', s.txq[:l])
# _CTXBytesSplitter creates new empty txsplit.
@func(_CTXBytesSplitter)
def __init__(s):
s.txq = []
# next feeds next (δt, tx_bytes, u) into txsplit.
#
# and returns ready parts of split stream.
@func(_CTXBytesSplitter)
def next(s, δt, tx_bytes, u: _Utx): # -> [](δt', u'+.txbytes)
# split tx_bytes in between cells according to (β₁+β₂)/Σcells(β₁+β₂)
# where βi is cell bandwidth in frame i.
assert len(s.txq) < 2
s.txq.append((δt, tx_bytes, u))
vtx = [] # of (δt', u'+.txbytes)
while len(s.txq) >= 2:
δt, tx_bytes, u1 = s.txq.pop(0)
_, _, u2 = s.txq[0]
Σβ12 = 0
for cell_id, uc1 in u1.cutx.items():
Σβ12 += uc1.bitrate
if cell_id in u2.cutx:
uc2 = u2.cutx[cell_id]
Σβ12 += uc2.bitrate
for cell_id, uc1 in u1.cutx.items():
β12 = uc1.bitrate
uc2 = u2.cutx.get(cell_id)
if uc2 is not None:
β12 += uc2.bitrate
if Σβ12 != 0:
uc1.tx_bytes = tx_bytes * β12 / Σβ12
else:
# should not happen, but divide equally just in case
uc1.tx_bytes = tx_bytes / len(u1.cutx)
vtx.append((δt, u1))
return vtx
# finish tells txsplit to flush its output queue.
#
# txsplit becomes reset.
@func(_CTXBytesSplitter)
def finish(s): # -> [](δt', u'+.txbytes)
assert len(s.txq) < 2
if len(s.txq) == 0:
return []
assert len(s.txq) == 1
# yield last chunk, by appending artificial empty tx frame
zutx = _Utx()
zutx.qtx_bytes = {}
zutx.cutx = {}
vtx = s.next(s.txq[0][0], 0, zutx)
assert len(vtx) == 1
assert len(s.txq) == 1
s.txq = []
return vtx
# __repr__ returns human-readable representation of Sample. # __repr__ returns human-readable representation of Sample.
@func(Sample) @func(Sample)
def __repr__(s): def __repr__(s):
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
from __future__ import print_function, division, absolute_import from __future__ import print_function, division, absolute_import
from xlte.amari.drb import _Sampler, Sample, _BitSync, _Utx, _UCtx, tti, _IncStats from xlte.amari.drb import _Sampler, Sample, _BitSync, _CTXBytesSplitter, _Utx, _UCtx, tti, _IncStats
import numpy as np import numpy as np
from golang import func from golang import func
...@@ -158,6 +158,7 @@ def UCtx(tx, bitrate, rank, xl_use_avg): ...@@ -158,6 +158,7 @@ def UCtx(tx, bitrate, rank, xl_use_avg):
uc.bitrate = bitrate uc.bitrate = bitrate
uc.rank = rank uc.rank = rank
uc.xl_use_avg = xl_use_avg uc.xl_use_avg = xl_use_avg
uc.tx_bytes = None
return uc return uc
...@@ -582,6 +583,113 @@ def test_BitSync(): ...@@ -582,6 +583,113 @@ def test_BitSync():
( 0, 0 )] ( 0, 0 )]
# verify how tx_bytes is partitioned in between cells by _BitSync.
def test_CTXBytesSplitter():
# _ passes txv_in into _CTXBytesSplitter and returns output stream.
#
# txv_in = [](tx_bytes, byterate1, byterate2)
def _(*txv_in):
def _do_txsplit(*txv_in):
txv_out = []
txsplit = _CTXBytesSplitter()
# Utx2 returns _Utx representing transmission on up to two cells.
def Utx2(byterate1, byterate2):
u = _Utx()
u.qtx_bytes = None # not used by _CTXBytesSplitter
u.cutx = {}
if byterate1 is not None:
u.cutx[1] = UCtx(None, 8*byterate1, None, None)
if byterate2 is not None:
u.cutx[2] = UCtx(None, 8*byterate2, None, None)
return u
# t2iter yields result of txsplit .next/.finish in simplified form
# convenient for testing.
def t2iter(_): # -> i[](tx_bytes1, tx_bytes2)
for (δt, u) in _:
assert δt == 10*tti
assert set(u.cutx.keys()).issubset([1,2])
tx_bytes1 = None
tx_bytes2 = None
if 1 in u.cutx:
tx_bytes1 = u.cutx[1].tx_bytes
if 2 in u.cutx:
tx_bytes2 = u.cutx[2].tx_bytes
yield (tx_bytes1, tx_bytes2)
for (tx_bytes, byterate1, byterate2) in txv_in:
_ = txsplit.next(10*tti, tx_bytes, Utx2(byterate1, byterate2))
txv_out += list(t2iter(_))
_ = txsplit.finish()
txv_out += list(t2iter(_))
return txv_out
def do_txsplit(*txv_in):
txv_out = _do_txsplit(*txv_in)
# verify the output is symmetrical in between C1 and C2
xtv_in = list((t, b2, b1) for (t, b1, b2) in txv_in)
xtv_out = _do_txsplit(*xtv_in)
xtv_out_ = list((t1, t2) for (t2, t1) in xtv_out)
assert xtv_out_ == txv_out
return txv_out
txv_out = do_txsplit(*txv_in)
# also check with 0-tail -> it should give the same
txv_out_ = do_txsplit(*(txv_in + ((0,0,0),)*10))
assert txv_out_ == txv_out + [(0,0)]*10
return txv_out
# C1 C2 C1 C2
# tx_bytes byterate byterate tx_bytes tx_bytes
# (1 element only)
assert _((1000, 1000, None)) == [(1000, None)] # identity for 1 cell
assert _((1000, 1000, 0)) == [(1000, 0)] # C2.bitrate = 0
assert _((1000, 0, 0)) == [( 500, 500)] # ΣC.bitrate = 0 -> divided equally
# (≥ 2 elements - tests queuing)
assert _((1000, 1000, None), # identity for 1 cell
(2000, 2000, None)) == [(1000, None),
(2000, None)]
assert _((1000, 1000, None), # C2 appears
(2000, 1500, 500),
(2000, 1500, 500),
(2000, 500, 1500)) == [(1000, None),
(1500, 500),
(1000, 1000),
( 500, 1500)]
assert _((2000, 1000, 1000), # C2 disappears
(2000, 1500, 500),
(1000, 500, None),
(1000, 1000, None)) == [(1250, 750),
(1600, 400),
(1000, None),
(1000, None)]
assert _((2000, 0, 0), # ΣC.bitrate = 0
(2000, 0, 0),
(1000, 0, 0),
(1000, 0, 0)) == [(1000, 1000),
(1000, 1000),
( 500, 500),
( 500, 500)]
assert _((2000, 1, 0), # C2.bitrate = 0
(2000, 1, 0),
(1000, 1, 0),
(1000, 1, 0)) == [(2000, 0),
(2000, 0),
(1000, 0),
(1000, 0)]
# ---- misc ---- # ---- misc ----
# teach tests to compare Samples # teach tests to compare Samples
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment