{#- XXX ORS -> provide defaults for: slave_instance_list #} {#- XXX icell vs cell / iru vs ru everyhere + document it #} {#- XXX TDD check #} {#- defaults provide default values for parameters. it should be kept in sync with "default" in json schemas #} {%- set defaults = { 'enb_id': '0x1A2D0', 'gnb_id': '0x12345', 'gnb_id_bits': 28, 'cell': { 'pci': 1, 'tac': "0x0001", }, 'cpri_link': { 'mapping': 'hw', 'mult': 16, 'rx_delay': 0, 'tx_delay': 0, 'tx_dbm': 0, } } %} {#- cfg returns value of configuration parameter name #} {%- macro cfg(name) %} {{- slapparameter_dict.get(name, defaults.get(name)) }} {%- endmacro %} {%- import 'lte.jinja2' as lte -%} {#- J is used around macro calls to retrieve returned objects. It is needed to workaround jinja2 limitation that macro can return only strings - not arbitrary objects: we return objects as JSON-encoded string and J decodes them. By convention macros that return JSON-encoded objects start with "j" prefix. Usage example: set obj = J(jmymacro(...)) #} {%- set J = json_module.loads %} {#- XXX error(slave, msg) vvv is debug stub -#} {%- macro error(slave, msg) %} {%- set msg = 'E: %s: %s\n' % (J(jref_of_shared(slave)), msg) %} {%- do print('\n%s' % msg) %} {%- do assert(False, msg) %} {%- endmacro %} {#- bug indicates an error in template logic. it should not happen. #} {%- macro bug(msg) %} {%- do assert(False, msg) %} {%- endmacro %} {#- jcell_ru_ref returns RU reference linked from a cell. if the cell embeds RU definition, its reference comes as `_<cell_ref>_ru`. #} {%- macro jcell_ru_ref(icell) %} {{- _jcell_ru_ref(icell, []) }} {%- endmacro %} {%- macro _jcell_ru_ref(icell, seen) %} {%- set cell_ref = J(jref_of_shared(icell)) %} {%- if cell_ref in seen %} {%- for x in seen %} {%- do error(x, "%s form a cycle via RU references" % seen) %} {%- endfor %} {#- XXX what to return ? #} {%- else %} {%- do seen.append(cell_ref) %} {%- set ru = icell['_']['ru'] %} {%- if ru['ru_type'] == 'ru_ref' %} {{- ru['ru_ref'] | tojson }} {%- elif ru['ru_type'] == 'ruincell_ref' %} {#- XXX first check referred cell exist ? #} {{- _jcell_ru_ref(icell_dict[ru['ruincell_ref']], seen) }} {%- else %} {#- ru definition is embedded into cell #} {{- ('_%s_ru' % J(jref_of_shared(icell))) | tojson }} {%- endif %} {%- endif %} {%- endmacro %} {#- jref_of_shared returns original reference used to request shared instance. it is extracted from slave_reference which is composed as <partition_id>_<reference>. #} {%- macro jref_of_shared(slave) %} {#- do print('jref_of_shared %r' % (slave,)) #} {%- set ref = slave['slave_reference'] %} {%- set partition_id = slap_configuration['slap-computer-partition-id'] %} {%- if ref.startswith(partition_id) %} {%- set ref = ref[len(partition_id):] %} {%- endif %} {%- if ref.startswith('_') %} {%- set ref = ref[1:] %} {%- endif %} {{- ref | tojson }} {%- endmacro %} {#- icell_dict keeps cell slave instances: reference -> icell iru_dict keeps RU slave instances + RU whose definition is embedded into a cell: reference -> iRU in the kept instances _ is automatically json-decoded #} {%- set icell_dict = {} %} {%- set iru_dict = {} %} {%- for slave in slap_configuration.get('slave-instance-list', []) %} {%- set ref = J(jref_of_shared(slave)) %} {%- set _ = J(slave['_']) %} {%- do slave.update({'_': _}) %} {%- if 'ru_type' in _ %} {%- set iru = slave %} {%- do iru_dict.update({ref: iru}) %} {%- elif 'cell_type' in _ %} {%- set icell = slave %} {%- for k, v in defaults['cell'].items() %} {%- do _.setdefault(k, v) %} {%- endfor %} {%- do icell_dict.update({ref: icell}) %} {%- set ru = _['ru'] %} {%- if ru['ru_type'] not in ('ru_ref', 'ruincell_ref') %} {#- embedded ru definition -> expose it as `_<cell_ref>_ru` #} {%- do iru_dict.update({'_%s_ru' % ref: { '_': ru, 'slave_title': '%s. RU' % icell['slave_title'], 'slave_reference': icell['slave_reference'], }}) %} {%- endif %} {%- else %} {%- do error(slave, "unknown shared instance type") %} {%- endif %} {%- endfor %} {#- do print('\n>>> iru_dict:'), pprint(iru_dict) #} {#- do print('\n>>> icell_dict:'), pprint(icell_dict) #} {#- XXX verify cell_type = lte|nr #} {#- verify that there is no dangling cell->ru references #} {#- XXX also verify that there is no dangling cell -> cell refs in ruincell_ref #} {%- for _, icell in icell_dict|dictsort %} {%- set ru_ref = J(jcell_ru_ref(icell)) %} {%- if ru_ref not in iru_dict %} {%- do error(icell, "referred RU %s does not exist" % ru_ref) %} {%- endif %} {%- endfor %} {#- start of the config -#} { log_options: "all.level=error,all.max_size=0,nas.level=debug,nas.max_size=1,s1ap.level=debug,s1ap.max_size=1,x2ap.level=debug,x2ap.max_size=1,rrc.level=debug,rrc.max_size=1, {%- if slapparameter_dict.get('log_phy_debug', False) -%} phy.level=debug {%- else -%} phy.level=info {%- endif -%} ,file.rotate=1G,file.path=/dev/null", log_filename: "{{ directory['log'] }}/enb.log", {# instantiate radio units #} rf_driver: { {%- if slapparameter_dict.get('disable_sdr', False) %} name: "dummy", {%- else %} name: "sdr", {%- endif %} {%- if slapparameter_dict.get('gps_sync', False) %} sync: "gps", {%- endif %} {%- set dev_argv = [] %} {%- set ru_sdr_dict = {} %} {# dev -> ru for ru with ru_type = sdr #} {%- set ru_cpri_dict = {} %} {# dev -> ru for ru with link_kind = cpri #} {%- for rf_port, (ru_ref, iru) in enumerate(iru_dict|dictsort) %} {%- set ru = iru['_'] %} {%- do ru.update({'.rf_port': rf_port}) %} {%- if ru['ru_type'] == 'sdr' %} {%- do ru_sdr_dict.update({len(dev_argv): ru}) %} {#- XXX move to ru/sdr ? #} {%- for n in ru['sdr_dev_list'] %} {%- do dev_argv.append("dev%d=/dev/sdr%d" % (len(dev_argv), n)) %} {%- endfor %} {%- elif ru['ru_link_type'] == 'cpri' %} {%- do ru_cpri_dict.update({len(dev_argv): ru}) %} {%- set link = ru['cpri_link'] %} {#- set cpri_link parameters to default #} {%- for k, v in defaults['cpri_link'].items() %} {%- do link.setdefault(k, v) %} {%- endfor %} {%- do dev_argv.append("dev%d=/dev/sdr%d@%d" % (len(dev_argv), link['sdr_dev'], link['sfp_port'])) %} {%- else %} {%- do bug('unreachable') %} {%- endif %} {%- endfor %} args: "{{dev_argv | join(',')}}", {#- emit sdr-related options if an sdr ru is present #} {%- if len(ru_sdr_dict) > 0 %} rx_antenna:"tx_rx", {#- XXX -> ru/sdr ? #} tdd_tx_mod: 1, {%- endif %} {#- emit cpri_* optins if a cpri ru is present #} {#- NOTE values for non-cpri links come as empty XXX recheck this is ok #} {%- if len(ru_cpri_dict) > 0 %} {%- set vcpri = [None]*len(dev_argv) %} {%- for dev, ru in ru_cpri_dict|dictsort %} {%- do vcpri.__setitem__(dev, ru['cpri_link']) %} {%- endfor %} cpri_mapping: "{{ vcpri | map(attribute='mapping') | map('default', '') | join(',') }}", cpri_mult: "{{ vcpri | map(attribute='mult') | map('default', '') | join(',') }}", cpri_rx_delay: "{{ vcpri | map(attribute='rx_delay') | map('default', '') | join(',') }}", cpri_tx_delay: "{{ vcpri | map(attribute='tx_delay') | map('default', '') | join(',') }}", cpri_tx_dbm: "{{ vcpri | map(attribute='tx_dbm') | map('default', '') | join(',') }}", {%- set tap_base = slap_configuration.get('tap-name', '') %} {%- set vtap = ['']*len(dev_argv) %} {%- for i, (dev, _) in enumerate(ru_cpri_dict|dictsort) %} {%- if len(ru_cpri_dict) > 1 %} {%- set tap = "%s-%d" % (tap_base, i) %} {%- else %} {%- set tap = tap_base %} {%- endif %} {%- do vtap.__setitem__(dev, tap) %} {%- endfor %} ifname: "{{ vtap | join(',') }}", {%- endif %} }, {# TODO amarisoft doc says tx_gain/rx_gain are per-channel -> allow separate configuration NOTE one RF port can consist of multiple DL and multiple UL channels #} tx_gain: {{ tx_gain }}, rx_gain: {{ rx_gain }}, {%- if slapparameter_dict.get('websocket_password', '') %} com_addr: "[{{ gtp_addr_v6 }}]:9001", com_auth: { password: "{{ slapparameter_dict['websocket_password'] }}", }, {%- else %} com_addr: "127.0.1.2:9001", {%- endif %} mme_list: [ {%- for _, mme in slapparameter_dict.get('mme_list', {'1':{'mme_addr': '127.0.1.100'}}) | dictsort %} { mme_addr: "{{ mme['mme_addr'] }}", }, {%- endfor %} ], {% if slapparameter_dict.get('mme_list', '') %} {% if slapparameter_dict.get('use_ipv4', False) %} gtp_addr: "{{ gtp_addr_v4 }}", {% else %} gtp_addr: "{{ gtp_addr_v6 }}", {% endif %} {% else %} gtp_addr: "127.0.1.1", {% endif %} enb_id: {{ cfg('enb_id') }}, gnb_id: {{ cfg('gnb_id') }}, gnb_id_bits: {{ cfg('gnb_id_bits') }}, en_dc_support: true, // LTE cells cell_list: [ {%- for i, (cell_ref, icell) in enumerate(icell_dict|dictsort) %} {% set cell = icell['_'] %} {%- if cell['cell_type'] == 'lte' %} {%- set ru_ref = J(jcell_ru_ref(icell)) %} {%- set iru = iru_dict[ru_ref] %} {%- set ru = iru['_'] %} // {{ J(jref_of_shared(icell)) }} { rf_port: {{ ru['.rf_port'] }}, n_antenna_dl: {{ ru['n_antenna_dl'] }}, n_antenna_ul: {{ ru['n_antenna_ul'] }}, cell_id: {{ cell['cell_id'] }}, tac: {{ cell['tac'] }}, n_id_cell: {{ cell['pci'] }}, dl_earfcn: {{ cell['dl_earfcn'] }}, root_sequence_index: {{ cell.get('root_sequence_index', slapparameter_dict.get('root_sequence_index', 204 + i)) }}, // Handover ncell_list: [ // Intra-ENB HO {%- for cell2_ref, icell2 in icell_dict|dictsort %} {%- set cell2 = icell2['_'] %} {%- if cell2_ref != cell_ref %} {#- NOTE: HO to both LTE and NR #} { {%- if cell2['cell_type'] == 'lte' %} rat: "eutra", cell_id: {{ cfg('enb_id')}}{{ cell2['cell_id'] }}, // -> {{ J(jref_of_shared(icell2)) }} n_id_cell: {{ cell2['pci'] }}, dl_earfcn: {{ cell2['dl_earfcn'] }}, tac: {{ cell2['tac'] }}, {%- elif cell2['cell_type'] == 'nr' %} rat: "nr", nr_cell_id: {{ cfg('gnb_id')}}{{ cell2['cell_id'] }}, // -> {{ J(jref_of_shared(icell2)) }} n_id_cell: {{ cell2['pci'] }}, gnb_id_bits dl_nr_arfcn ssb_nr_arfcn ul_nr_arfcn tac band ssb_subcarrier_spacing: 30, ssb_period: 20, ssb_offset: 0, ssb_duration: 1, {%- else %} {%- do bug('unreachable') %} {%- endif %} }, {%- endif %} {%- endfor %} // Inter-ENB HO {#- TODO: add info about neighbours as shared instances - one instance per neigh *ENB*. then query SlapOS Master about cells configured on that neigh ENB and put them as neighbours here #} ], // Carrier Aggregation {#- CA in between all LTE cells #} // XXX + CA LTE-NR scell_list: [ {%- for cell2_ref, icell2 in icell_dict|dictsort %} {%- set cell2 = icell2['_'] %} {%- if cell2_ref != cell_ref and cell2['cell_type'] == 'lte' %} { cell_id: {{ cell2['cell_id'] }}, // + {{ J(jref_of_shared(icell2)) }} cross_carrier_scheduling: false, }, {%- endif %} {%- endfor %} ], {# tune LTE parameters for the cell #} {%- set tdd = (cell['rf_mode'] == 'tdd') %} {%- if tdd %} uldl_config: {{ lte.cell_tdd_config(cell) }}, sp_config: 7, {%- endif %} {%- set n_rb_dl = lte.cell_n_rb_dl(cell) %} n_rb_dl: {{ n_rb_dl }}, si_coderate: {{ 0.30 if n_rb_dl == 6 else 0.20 }}, pdsch_dedicated: { p_a: {{ {4: -6, 2: -3}.get(ru['n_antenna_dl'], 0) }}, p_b: -1, }, pdcch_format: {{ 1 if n_rb_dl == 6 else 2 }}, prach_config_index: {{ 15 if n_rb_dl == 6 else 4 }}, initial_cqi: {{ 5 if n_rb_dl == 6 else 3 }}, pucch_dedicated: { n1_pucch_sr_count: 11, cqi_pucch_n_rb: 1, n1_pucch_an_cs_count: 8, {# XXX else "n1_pucch_an_cs_count must be > 0 for the CA Primary cell" #} n3_pucch_an_n_rb: 3, {# XXX else "n3_pucch_an_n_rb must be > 0 for the CA Primary cell" #} {%- if tdd %} tdd_ack_nack_feedback_mode: "multiplexing", /* TDD only */ {%- endif %} }, {%- if ru['n_antenna_dl'] >= 2 %} m_ri: 8, transmission_mode: 3, {%- endif %} srs_dedicated: { {%- if n_rb_dl == 6 %} srs_bandwidth_config: 7, srs_bandwidth: 1, {%- elif n_rb_dl == 15 %} srs_bandwidth_config: 6, srs_bandwidth: 1, {%- elif n_rb_dl == 25 %} srs_bandwidth_config: 3, srs_bandwidth: 1, {%- elif n_rb_dl == 50 %} srs_bandwidth_config: 2, srs_bandwidth: 2, {%- elif n_rb_dl == 75 %} srs_bandwidth_config: 2, srs_bandwidth: 2, {%- else %} srs_bandwidth_config: 2, srs_bandwidth: 3, {%- endif %} srs_subframe_config: 3, srs_period: 40, srs_hopping_bandwidth: 0, }, {#- XXX do we want to differentiate whether it is ORS or Amarisoft SDR board ? #} {%- if ru['ru_type'] == 'sdr' %} manual_ref_signal_power: true, {%- endif %} drb_config: {{ lte.cell_drb_config(cell) }} }, {%- endif %} {%- endfor %} ], cell_default: { plmn_list: [ {%- if slapparameter_dict.get('plmn_list', '') %} {%- for i, k in enumerate(slapparameter_dict['plmn_list']) %} {%- if i == 0 -%} { {%- else -%} , { {%- endif %} plmn: "{{ slapparameter_dict['plmn_list'][k]['plmn'] }}", reserved: {{ str(slapparameter_dict['plmn_list'][k].get('reserved', false)).lower() }}, attach_without_pdn: {{ str(slapparameter_dict['plmn_list'][k].get('attach_without_pdn', false)).lower() }}, } {%- endfor -%} {% else %} "00101", {% endif %} ], cyclic_prefix: "normal", phich_duration: "normal", phich_resource: "1", si_value_tag: 0, cell_barred: false, intra_freq_reselection: true, q_rx_lev_min: -70, si_window_length: 40, sib_sched_list: [ { filename: "{{ sib23_file }}", si_periodicity: 16, }, ], si_pdcch_format: 2, n_symb_cch: 0, prach_freq_offset: -1, pusch_dedicated: { beta_offset_ack_index: 9, beta_offset_ri_index: 6, beta_offset_cqi_index: 6, }, pusch_hopping_offset: -1, pusch_msg3_mcs: 0, dl_256qam: true, ul_64qam: true, sr_period: 20, cqi_period: 40, mac_config: { ul_max_harq_tx: 5, dl_max_harq_tx: 5, }, pusch_max_its: 6, dpc: true, dpc_pusch_snr_target: 25, dpc_pucch_snr_target: 20, cipher_algo_pref: [], integ_algo_pref: [2, 1], // XXX -> cell ? inactivity_timer: {{ slapparameter_dict.get('inactivity_timer', slap_configuration['configuration.default_lte_inactivity_timer']) }}, srb_config: [ { id: 1, maxRetxThreshold: 32, t_Reordering: 45, t_PollRetransmit: 60, }, { id: 2 , maxRetxThreshold: 32, t_Reordering: 45, t_PollRetransmit: 60, } ], meas_config_desc: { a1_report_type: "rsrp", a1_rsrp: -70, a1_hysteresis: 0, a1_time_to_trigger: 640, a2_report_type: "rsrp", a2_rsrp: -80, a2_hysteresis: 0, a2_time_to_trigger: 640, a3_report_type: "rsrp", a3_offset: {{ slapparameter_dict.get('lte_handover_a3_offset', 6) }}, a3_hysteresis: 0, a3_time_to_trigger: {{ slapparameter_dict.get('lte_handover_a3_time_to_trigger', 480) }}, }, meas_gap_config: "gp0", ho_from_meas: true, }, // NR cells (TODO) {#- XXX + nr_cell_list #} {#- XXX + nr_cell_default #} }