Commit 0ea04a2b authored by Joanne Hugé's avatar Joanne Hugé

Clean up, bug fixes, add frame tracer and analyzer

parent 7b4b19a8
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <error.h>
#include <fcntl.h>
#include <getopt.h>
#include <immintrin.h>
#include <inttypes.h>
#include <limits.h>
#include <linux/if_packet.h>
#include <math.h>
#include <netdb.h>
#include <netinet/ether.h>
#include <netinet/in.h>
#include <net/if.h>
#include <pthread.h>
#include <sched.h>
#include <semaphore.h>
#include <signal.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
/* Note: src and dst must be 16 byte aligned */
static void float_to_int16(int16_t *dst, const float *src, int n, float mult)
{
const __m128 *p;
__m128i *q, a0, a1;
__m128 mult1;
mult1 = _mm_set1_ps(mult);
p = (const void *)src;
q = (void *)dst;
while (n >= 16) {
a0 = _mm_cvtps_epi32(p[0] * mult1);
a1 = _mm_cvtps_epi32(p[1] * mult1);
q[0] = _mm_packs_epi32(a0, a1);
a0 = _mm_cvtps_epi32(p[2] * mult1);
a1 = _mm_cvtps_epi32(p[3] * mult1);
q[1] = _mm_packs_epi32(a0, a1);
p += 4;
q += 2;
n -= 16;
}
if (n >= 8) {
a0 = _mm_cvtps_epi32(p[0] * mult1);
a1 = _mm_cvtps_epi32(p[1] * mult1);
q[0] = _mm_packs_epi32(a0, a1);
p += 2;
q += 1;
n -= 8;
}
if (n != 0) {
/* remaining samples (n <= 7) */
do {
a0 = _mm_cvtps_epi32(_mm_load_ss((float *)p) * mult);
*(int16_t *)q = _mm_cvtsi128_si32 (_mm_packs_epi32(a0, a0));
p = (__m128 *)((float *)p + 1);
q = (__m128i *)((int16_t *)q + 1);
n--;
} while (n != 0);
}
}
/* Note: src and dst must be 16 byte aligned */
static void int16_to_float(float *dst, const int16_t *src, int len, float mult)
{
__m128i a0, a1, a, b, sign;
__m128 mult1;
mult1 = _mm_set1_ps(mult);
while (len >= 8) {
a = *(__m128i *)&src[0];
#ifdef SSE4
a0 = _mm_cvtepi16_epi32(a);
#else
// Fix for CPU without SSE4.1
a0 = _mm_unpacklo_epi16(a, a);
a0 = _mm_srai_epi32(a0, 16);
#endif
b = _mm_srli_si128(a, 8);
#ifdef SSE4
a1 = _mm_cvtepi16_epi32(b);
#else
a1 = _mm_unpacklo_epi16(b, b);
a1 = _mm_srai_epi32(a1, 16);
#endif
*(__m128 *)&dst[0] = _mm_cvtepi32_ps(a0) * mult1;
*(__m128 *)&dst[4] = _mm_cvtepi32_ps(a1) * mult1;
dst += 8;
src += 8;
len -= 8;
}
/* remaining data */
while (len != 0) {
_mm_store_ss(&dst[0], _mm_cvtsi32_ss(_mm_setzero_ps(), src[0]) * mult1);
dst++;
src++;
len--;
}
}
/* Compr_bf1 */
static inline int max_int(int a, int b)
{
if (a > b)
return a;
else
return b;
}
static inline int min_int(int a, int b)
{
if (a < b)
return a;
else
return b;
}
static inline int clamp_int(int val, int min_val, int max_val)
{
if (val < min_val)
return min_val;
else if (val > max_val)
return max_val;
else
return val;
}
/* 1 <= n_bits <= 7 */
static void encode_put_bits(uint8_t *buf, int bit_offset, int n_bits, unsigned int val)
{
int shift, n;
shift = bit_offset & 7;
buf[bit_offset >> 3] |= val << shift;
n = 8 - shift;
if (n > 0)
buf[(bit_offset >> 3) + 1] |= val >> n;
}
/* 1 <= n_bits <= 7 */
static int decode_get_bits(const uint8_t *buf, int bit_offset, int n_bits)
{
int shift, n, v;
shift = bit_offset & 7;
n = 8 - shift;
v = buf[bit_offset >> 3];
if (n_bits > n)
v |= buf[(bit_offset >> 3) + 1] << 8;
return (v >> shift) & ((1 << n_bits) - 1);
}
#define BLOCK_LEN 8
#define CMULT_BITS 7
#define CMULT 181
/* CPRI frame format for 122.88 MHz with 4 channels using bit rate
option 7 (9.8304 Gb/s):
data byte offset
0 Channel 0. One BF1 block containing I0, Q0, I1, Q1, ..., I31, Q31.
60 Channel 1
120 Channel 2
180 Channel 3
*/
/* take 64 * 16 bit I or Q values as input and return 60 bytes into obuf.
When using a smaller sample bit width, the samples should be
appropriately scaled (e.g. multiplied by 2 for a bit width =
15). */
static void encode_bf1(uint8_t *obuf, const int16_t *samples)
{
int e, vmax, v, block_num, i, e1, m;
memset(obuf, 0, 4 + 7 * BLOCK_LEN);
/* the compressed data contains 8 blocks of 8 I or Q components. Each
block is compressed independently but the corresponding values
and output bits are interleaved. */
for(block_num = 0; block_num < 8; block_num++) {
vmax = 0;
for(i = 0; i < BLOCK_LEN; i++) {
vmax = max_int(vmax, abs(samples[i * 8 + block_num]));
}
vmax = min_int(vmax, (1 << 15) - 1);
/* 0 <= vmax <= 2^15-1 */
e = 8;
while (vmax < (1 << 14) && e > 0) {
vmax <<= 1;
e--;
}
if (e <= 1) {
e1 = e;
m = 1;
} else {
m = (vmax >= (CMULT << (14 - CMULT_BITS)));
e1 = e * 2 + m - 2;
}
encode_put_bits(obuf, block_num * 32, 4, e1); /* store the exponent */
for(i = 0; i < BLOCK_LEN; i++) {
v = samples[i * 8 + block_num];
v = v << (8 - e);
if (!m)
v = v * CMULT;
else
v = v << CMULT_BITS;
/* quantize on 7 bits using two's complement notation */
v = (v + (1 << (8 + CMULT_BITS))) >> (9 + CMULT_BITS);
v = clamp_int(v, -64, 63);
v = v & 0x7f;
if (i < 4)
encode_put_bits(obuf, block_num * 32 + 4 + i * 7, 7, v);
else
encode_put_bits(obuf, 8 * 32 + block_num * 28 + (i - 4) * 7, 7, v);
}
}
}
/* Take 60 bytes as input and return 64 values of sample_bit_width bits
(14 <= sample_bit_width <= 16) */
static void decode_bf1(int16_t *samples, const uint8_t *ibuf, int sample_bit_width)
{
int e, v, e1, m, shift, bias;
int block_num, i;
shift = (15 - sample_bit_width) + CMULT_BITS + 1;
bias = (1 << shift) >> 1;
for(block_num = 0; block_num < 8; block_num++) {
e1 = decode_get_bits(ibuf, block_num * 32, 4);
if (e1 <= 1) {
e = e1;
m = 1;
} else {
e1 += 2;
m = e1 & 1;
e = e1 >> 1;
}
for(i = 0; i < BLOCK_LEN; i++) {
if (i < 4)
v = decode_get_bits(ibuf, block_num * 32 + 4 + i * 7, 7);
else
v = decode_get_bits(ibuf, 8 * 32 + block_num * 28 + (i - 4) * 7, 7);
/* sign extend two's complement 7 bit value to 32 bits */
v = (int32_t)((uint32_t)v << (32 - 7)) >> (32 - 7);
/* multiply by the scale */
v = v << e;
if (!m)
v = v * CMULT;
else
v = v << (CMULT_BITS + 1);
/* shift and round */
v = (v + bias) >> shift;
samples[i * 8 + block_num] = v;
}
}
}
char in[256];
char out[256];
int main(int argc, char ** argv) {
int compress, tx;
int16_t int16_samples[64];
const float mult = 1. / 32767.;
if(argc != 2)
fprintf(stderr, "Wrong argument number");
compress = (argv[1][0] == 'c');
freopen(NULL, "rb", stdin);
fread(in, compress ? (64 * 4) : 60 , 1, stdin);
fclose(stdin);
if(compress) {
float_to_int16(int16_samples, (float *) in, 64, 32767);
encode_bf1(out, int16_samples);
} else {
decode_bf1(int16_samples, in, 16);
int16_to_float((float *) out, int16_samples, 64, mult);
}
freopen(NULL, "wb", stdout);
fwrite(out, compress ? 60 : (64 * 4), 1, stdout);
fclose(stdout);
return 0;
}
import subprocess
import struct
import random
def float_to_byte(float_list):
iq_samples_bytes = bytearray(64 * 4)
for i in range(64):
b = struct.pack('f', float_list[i])
for j in range(4):
iq_samples_bytes[i * 4 + j] = b[j]
return bytes(iq_samples_bytes)
def byte_to_float(b):
float_list = []
for i in range(64):
x = struct.unpack('f', b[4 * i:4 * (i+1)])[0]
float_list.append(x)
return float_list
def compress(b):
s = subprocess.run(
["./bf1", "c"],
input=b,
capture_output=True,
)
return s.stdout
def decompress(b):
s = subprocess.run(["./bf1", "d"],
input=b,
capture_output=True,
)
return s.stdout
iq_samples = [0.2 * (random.randint(1,1000) / 1000.0) for i in range(64)]
iq_samples2 = byte_to_float(decompress(compress(float_to_byte(iq_samples))))
d = []
for i in range(64):
d.append(abs(iq_samples[i]-iq_samples2[i]))
print("{}% accuracy".format(100 - 100 * max(d) / max(map(abs, iq_samples))))
/* DPDK */
#define BURST_SIZE 16
#define MEMPOOL_CACHE_SIZE 256
#define RTE_TEST_RX_DESC_DEFAULT 1024
#define RTE_TEST_TX_DESC_DEFAULT 1024
static uint16_t nb_rxd = RTE_TEST_RX_DESC_DEFAULT;
static uint16_t nb_txd = RTE_TEST_TX_DESC_DEFAULT;
struct rte_mempool *mbuf_pool;
struct rte_ether_addr s_addr;
struct rte_ether_addr d_addr;
int8_t tx_data[BURST_SIZE][TX_PACKET_SIZE];
static const struct rte_eth_conf port_conf_default = {
.rxmode = { .max_lro_pkt_size = RTE_ETHER_MAX_LEN }
};
static inline int port_init(int portid, struct rte_mempool *mbuf_pool) {
struct rte_eth_conf port_conf = port_conf_default;
const uint16_t rx_rings = 1, tx_rings = 1;
int retval;
uint16_t q;
retval = rte_eth_dev_configure(portid, rx_rings, tx_rings, &port_conf);
if (retval != 0)
return retval;
/* Allocate and set up 1 RX queue per Ethernet port. */
for (q = 0; q < rx_rings; q++) {
retval = rte_eth_rx_queue_setup(portid, q, nb_rxd,
rte_eth_dev_socket_id(portid), NULL, mbuf_pool);
if (retval < 0)
return retval;
}
/* Allocate and set up 1 TX queue per Ethernet port. */
for (q = 0; q < tx_rings; q++) {
retval = rte_eth_tx_queue_setup(portid, q, nb_txd,
rte_eth_dev_socket_id(portid), NULL);
if (retval < 0)
return retval;
}
/* Start the Ethernet port. */
retval = rte_eth_dev_start(portid);
if (retval < 0)
return retval;
return 0;
}
static void init_dpdk(int argc, char ** argv) {
unsigned int nb_mbufs;
int ret;
ret = rte_eal_init(argc, argv);
if (ret < 0)
rte_exit(EXIT_FAILURE, "initlize fail!");
argc -= ret;
argv += ret;
nb_mbufs = RTE_MAX((nb_rxd + nb_txd + BURST_SIZE + MEMPOOL_CACHE_SIZE), 8192U);
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", nb_mbufs,
MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool == NULL)
rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
if (port_init(0, mbuf_pool) != 0)
rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n", 0);
}
/* DRB configuration for each 5QI value.
5QI characteristics in TS 23.501 table 5.7.4-1.
The qci parameter is used for the 5QI value */
#ifndef EPS_FALLBACK
#define EPS_FALLBACK 0
#endif
[
/**************************************** GBR */
{
qci: 1, /* UM - real time (RTP for VOIP) */
use_for_en_dc: false,
#if EPS_FALLBACK > 0
trigger_eps_fallback: true,
#endif
ims_dedicated_bearer: true,
pdcp_config: {
discardTimer: 100, /* in ms, 0 means infinity */
pdcp_SN_SizeUL: 12,
pdcp_SN_SizeDL: 12,
statusReportRequired: false,
outOfOrderDelivery: false,
t_Reordering: 0,
/* ROHC header compression */
/*
headerCompression: {
maxCID: 15,
profile0x0001: true, // RTP profile
profile0x0002: true, // UDP profile
profile0x0004: false, // IP profile
},
*/
},
rlc_config: {
ul_um: {
sn_FieldLength: 6,
},
dl_um: {
sn_FieldLength: 6,
t_Reassembly: 50,
},
},
logical_channel_config: {
priority: 7,
prioritisedBitRate: 0, /* in kb/s, -1 means infinity */
bucketSizeDuration: 100, /* in ms */
logicalChannelGroup: 1,
},
},
{
qci: 2, /* UM - real time (video) */
use_for_en_dc: false,
#if EPS_FALLBACK > 0
trigger_eps_fallback: true,
#endif
ims_dedicated_bearer: true,
pdcp_config: {
discardTimer: 150, /* in ms, 0 means infinity */
pdcp_SN_SizeUL: 18,
pdcp_SN_SizeDL: 18,
statusReportRequired: false,
outOfOrderDelivery: false,
t_Reordering: 0,
},
rlc_config: {
ul_um: {
sn_FieldLength: 12,
},
dl_um: {
sn_FieldLength: 12,
t_Reassembly: 50,
},
},
logical_channel_config: {
priority: 8,
prioritisedBitRate: 0, /* in kb/s, -1 means infinity */
bucketSizeDuration: 100, /* in ms */
logicalChannelGroup: 1,
},
},
{
qci: 3, /* UM - real time (gaming) */
pdcp_config: {
discardTimer: 100, /* in ms, 0 means infinity */
pdcp_SN_SizeUL: 18,
pdcp_SN_SizeDL: 18,
statusReportRequired: false,
outOfOrderDelivery: false,
t_Reordering: 0,
},
rlc_config: {
ul_um: {
sn_FieldLength: 12,
},
dl_um: {
sn_FieldLength: 12,
t_Reassembly: 50,
},
},
logical_channel_config: {
priority: 7,
prioritisedBitRate: 0, /* in kb/s, -1 means infinity */
bucketSizeDuration: 100, /* in ms */
logicalChannelGroup: 2,
},
},
{
qci: 4, /* AM - Non-Conversational Video (Buffered Streaming) */
pdcp_config: {
discardTimer: 0, /* in ms, 0 means infinity */
pdcp_SN_SizeUL: 18,
pdcp_SN_SizeDL: 18,
statusReportRequired: true,
outOfOrderDelivery: false,
},
rlc_config: {
ul_am: {
sn_FieldLength: 18,
t_PollRetransmit: 80, /* in ms */
pollPDU: 64,
pollByte: 125, /* in kBytes, 0 means infinity */
maxRetxThreshold: 4,
},
dl_am: {
sn_FieldLength: 18,
t_Reassembly: 80, /* in ms */
t_StatusProhibit: 10, /* in ms */
},
},
logical_channel_config: {
priority: 9,
prioritisedBitRate: 8, /* in kb/s, -1 means infinity */
bucketSizeDuration: 100, /* in ms */
logicalChannelGroup: 3,
},
},
{
qci: 65, /* UM - real time (MC-PTT voice) */
use_for_en_dc: false,
#if EPS_FALLBACK > 0
trigger_eps_fallback: true,
#endif
ims_dedicated_bearer: true,
pdcp_config: {
discardTimer: 100, /* in ms, 0 means infinity */
pdcp_SN_SizeUL: 12,
pdcp_SN_SizeDL: 12,
statusReportRequired: false,
outOfOrderDelivery: false,
t_Reordering: 0,
/* ROHC header compression */
/*
headerCompression: {
maxCID: 15,
profile0x0001: true, // RTP profile
profile0x0002: true, // UDP profile
profile0x0004: false, // IP profile
},
*/
},
rlc_config: {
ul_um: {
sn_FieldLength: 6,
},
dl_um: {
sn_FieldLength: 6,
t_Reassembly: 50,
},
},
logical_channel_config: {
priority: 5,
prioritisedBitRate: 0, /* in kb/s, -1 means infinity */
bucketSizeDuration: 100, /* in ms */
logicalChannelGroup: 4,
},
},
{
qci: 66, /* UM - real time (non MC-PTT voice) */
use_for_en_dc: false,
#if EPS_FALLBACK > 0
trigger_eps_fallback: true,
#endif
ims_dedicated_bearer: true,
pdcp_config: {
discardTimer: 150, /* in ms, 0 means infinity */
pdcp_SN_SizeUL: 18,
pdcp_SN_SizeDL: 18,
statusReportRequired: false,
outOfOrderDelivery: false,
t_Reordering: 0,
},
rlc_config: {
ul_um: {
sn_FieldLength: 12,
},
dl_um: {
sn_FieldLength: 12,
t_Reassembly: 50,
},
},
logical_channel_config: {
priority: 7,
prioritisedBitRate: 0, /* in kb/s, -1 means infinity */
bucketSizeDuration: 100, /* in ms */
logicalChannelGroup: 4,
},
},
{
qci: 67, /* UM - Mission Critical Video user plane */
use_for_en_dc: false,
#if EPS_FALLBACK > 0
trigger_eps_fallback: true,
#endif
ims_dedicated_bearer: true,
pdcp_config: {
discardTimer: 100, /* in ms, 0 means infinity */
pdcp_SN_SizeUL: 18,
pdcp_SN_SizeDL: 18,
statusReportRequired: false,
outOfOrderDelivery: false,
t_Reordering: 0,
},
rlc_config: {
ul_um: {
sn_FieldLength: 12,
},
dl_um: {
sn_FieldLength: 12,
t_Reassembly: 50,
},
},
logical_channel_config: {
priority: 6,
prioritisedBitRate: 0, /* in kb/s, -1 means infinity */
bucketSizeDuration: 100, /* in ms */
logicalChannelGroup: 5,
},
},
/**************************************** non GBR */
{
qci: 5, /* AM - high priority (SIP) */
use_for_en_dc: false,
pdcp_config: {
discardTimer: 0, /* in ms, 0 means infinity */
pdcp_SN_SizeUL: 18,
pdcp_SN_SizeDL: 18,
statusReportRequired: true,
outOfOrderDelivery: false,
},
rlc_config: {
ul_am: {
sn_FieldLength: 18,
t_PollRetransmit: 80, /* in ms */
pollPDU: 64,
pollByte: 125, /* in kBytes, 0 means infinity */
maxRetxThreshold: 4,
},
dl_am: {
sn_FieldLength: 18,
t_Reassembly: 80, /* in ms */
t_StatusProhibit: 10, /* in ms */
},
},
logical_channel_config: {
priority: 6,
prioritisedBitRate: 8, /* in kb/s, -1 means infinity */
bucketSizeDuration: 100, /* in ms */
logicalChannelGroup: 4,
},
},
{
qci: 6, /* AM - Video (buffered streaming) */
pdcp_config: {
discardTimer: 0, /* in ms, 0 means infinity */
pdcp_SN_SizeUL: 18,
pdcp_SN_SizeDL: 18,
statusReportRequired: true,
outOfOrderDelivery: false,
},
rlc_config: {
ul_am: {
sn_FieldLength: 18,
t_PollRetransmit: 80, /* in ms */
pollPDU: 64,
pollByte: 125, /* in kBytes, 0 means infinity */
maxRetxThreshold: 4,
},
dl_am: {
sn_FieldLength: 18,
t_Reassembly: 80, /* in ms */
t_StatusProhibit: 10, /* in ms */
},
},
logical_channel_config: {
priority: 10,
prioritisedBitRate: 8, /* in kb/s, -1 means infinity */
bucketSizeDuration: 100, /* in ms */
logicalChannelGroup: 5,
},
},
{
qci: 7, /* UM - voice, video (live streaming), interactive gaming */
pdcp_config: {
discardTimer: 100, /* in ms, 0 means infinity */
pdcp_SN_SizeUL: 18,
pdcp_SN_SizeDL: 18,
statusReportRequired: false,
outOfOrderDelivery: false,
t_Reordering: 0,
},
rlc_config: {
ul_um: {
sn_FieldLength: 12,
},
dl_um: {
sn_FieldLength: 12,
t_Reassembly: 50,
},
},
logical_channel_config: {
priority: 11,
prioritisedBitRate: 0, /* in kb/s, -1 means infinity */
bucketSizeDuration: 100, /* in ms */
logicalChannelGroup: 6,
},
},
{
qci: 8, /* AM - best effort (Internet traffic) */
pdcp_config: {
discardTimer: 0, /* in ms, 0 means infinity */
pdcp_SN_SizeUL: 18,
pdcp_SN_SizeDL: 18,
statusReportRequired: true,
outOfOrderDelivery: false,
},
rlc_config: {
ul_am: {
sn_FieldLength: 18,
t_PollRetransmit: 80, /* in ms */
pollPDU: 64,
pollByte: 125, /* in kBytes, 0 means infinity */
maxRetxThreshold: 4,
},
dl_am: {
sn_FieldLength: 18,
t_Reassembly: 80, /* in ms */
t_StatusProhibit: 10, /* in ms */
},
},
logical_channel_config: {
priority: 12,
prioritisedBitRate: 8, /* in kb/s, -1 means infinity */
bucketSizeDuration: 100, /* in ms */
logicalChannelGroup: 7,
},
},
{
qci: 9, /* AM - best effort (Internet traffic) */
pdcp_config: {
discardTimer: 0, /* in ms, 0 means infinity */
pdcp_SN_SizeUL: 18,
pdcp_SN_SizeDL: 18,
statusReportRequired: true,
outOfOrderDelivery: false,
},
rlc_config: {
ul_am: {
sn_FieldLength: 18,
t_PollRetransmit: 80, /* in ms */
pollPDU: 64,
pollByte: 125, /* in kBytes, 0 means infinity */
maxRetxThreshold: 4,
},
dl_am: {
sn_FieldLength: 18,
t_Reassembly: 80, /* in ms */
t_StatusProhibit: 10, /* in ms */
},
},
logical_channel_config: {
priority: 13,
prioritisedBitRate: 8, /* in kb/s, -1 means infinity */
bucketSizeDuration: 100, /* in ms */
logicalChannelGroup: 7,
},
},
{
qci: 69, /* AM - high priority (MC-PTT signalling) */
use_for_en_dc: false,
pdcp_config: {
discardTimer: 0, /* in ms, 0 means infinity */
pdcp_SN_SizeUL: 18,
pdcp_SN_SizeDL: 18,
statusReportRequired: true,
outOfOrderDelivery: false,
},
rlc_config: {
ul_am: {
sn_FieldLength: 18,
t_PollRetransmit: 80, /* in ms */
pollPDU: 64,
pollByte: 125, /* in kBytes, 0 means infinity */
maxRetxThreshold: 4,
},
dl_am: {
sn_FieldLength: 18,
t_Reassembly: 80, /* in ms */
t_StatusProhibit: 10, /* in ms */
},
},
logical_channel_config: {
priority: 4,
prioritisedBitRate: 8, /* in kb/s, -1 means infinity */
bucketSizeDuration: 100, /* in ms */
logicalChannelGroup: 4,
},
},
{
qci: 70, /* AM - MC data */
pdcp_config: {
discardTimer: 0, /* in ms, 0 means infinity */
pdcp_SN_SizeUL: 18,
pdcp_SN_SizeDL: 18,
statusReportRequired: true,
outOfOrderDelivery: false,
},
rlc_config: {
ul_am: {
sn_FieldLength: 18,
t_PollRetransmit: 80, /* in ms */
pollPDU: 64,
pollByte: 125, /* in kBytes, 0 means infinity */
maxRetxThreshold: 4,
},
dl_am: {
sn_FieldLength: 18,
t_Reassembly: 80, /* in ms */
t_StatusProhibit: 10, /* in ms */
},
},
logical_channel_config: {
priority: 11,
prioritisedBitRate: 8, /* in kb/s, -1 means infinity */
bucketSizeDuration: 100, /* in ms */
logicalChannelGroup: 5,
},
},
]
...@@ -39,14 +39,13 @@ rf_driver: { ...@@ -39,14 +39,13 @@ rf_driver: {
dpdk_options: "-l 10,20 -b 0000:04:00.0 -b 0000:3b:00.0 -b 0000:3b:00.1 -b 0000:5e:00.0 ", dpdk_options: "-l 10,20 -b 0000:04:00.0 -b 0000:3b:00.0 -b 0000:3b:00.1 -b 0000:5e:00.0 ",
recv_affinity: 39, recv_affinity: 39,
send_affinity: 38, send_affinity: 38,
prepare_affinity: 37, encode_affinity: 37,
decompress_affinity: 36, decode_affinity: 36,
statistic_affinity: 35, statistic_affinity: 35,
ecpri_period: 800, ecpri_period: 800,
flow_id: 0, flow_id: 0,
trace_period: 10000000, trace_period: 10000000,
trace_file: "/root/ecpri-logs/rx.trace", log_directory: "/root/ecpri-logs",
stats_file: "/root/ecpri-logs/ecpri.stats",
}, },
tx_gain: 90.0, /* TX gain (in dB) */ tx_gain: 90.0, /* TX gain (in dB) */
......
/* lteenb configuration file version ##VERSION##
* Copyright (C) 2019-2021 Amarisoft
* NR SA FDD or TDD cell */
#define TDD 1 // Values: 0 (NR FDD), 1(NR TDD)
#define TDD_CONFIG 2 // Values: 1, 2 or 3
#define N_ANTENNA_DL 4 // Values: 1 (SISO), 2 (MIMO 2x2), 4 (MIMO 4x4)
#define N_ANTENNA_UL 1 // Values: 1, 2, 4
#define BANDWIDTH 100 // NR cell bandwidth
#define CPRI 1
#define NR_TEST_MODE -1
/* define to 1 to enable periodic SRS with N_ANTENNA_UL ports. Uplink
SU-MIMO is also enabled if N_ANTENNA_UL >= 2. Not all UEs support
uplink SU-MIMO. */
#define USE_SRS 0
{
//log_options: "all.level=debug,all.max_size=1",
log_options: "all.level=error,all.max_size=0,nas.level=debug,nas.max_size=1,ngap.level=debug,ngap.max_size=1,xnap.level=debug,xnap.max_size=1,rrc.level=debug,rrc.max_size=1",
log_filename: "/tmp/gnb0.log",
/* Enable remote API and Web interface */
com_addr: "0.0.0.0:9001",
rf_driver: {
name: "ecpri",
rec_mac: "b8:59:9f:07:86:42",
re_mac: "04:09:a5:0f:9f:4a", /* HFR Switch */
rec_if: "ens9f0",
dpdk_options: "-l 10,20 -b 0000:04:00.0 -b 0000:3b:00.0 -b 0000:3b:00.1 -b 0000:5e:00.1 ",
//rec_mac: "b8:59:9f:07:86:43",
//re_mac: "b8:59:9f:07:7e:2b", /* Tiogapass003 */
//rec_if: "ens9f1",
//dpdk_options: "-l 10,20 -b 0000:04:00.0 -b 0000:3b:00.0 -b 0000:3b:00.1 -b 0000:5e:00.0 ",
recv_affinity: 39,
send_affinity: 38,
encode_affinity: 37,
decode_affinity: 36,
statistic_affinity: 35,
ecpri_period: 800,
flow_id: 0,
trace_period: 10000000,
log_directory: "/root/ecpri-logs",
},
tx_gain: 90.0, /* TX gain (in dB) */
rx_gain: 60.0, /* RX gain (in dB) */
sample_rate: 122.88, /* MHz */
amf_list: [
{
/* address of AMF for NGAP connection. Must be modified if the AMF runs on a different host. */
amf_addr: "127.0.1.100",
},
],
/* GTP bind address (=address of the ethernet interface connected to
the AMF). Must be modified if the AMF runs on a different host. */
gtp_addr: "127.0.1.1",
gnb_id_bits: 28,
gnb_id: 0x12345,
nr_support: true,
/* list of cells */
cell_list: [],
nr_cell_list: [
{
rf_port: 0,
cell_id: 0x01,
#if TDD == 1
band: 78,
//dl_nr_arfcn: 632628, /* 3489.42 MHz */
dl_nr_arfcn: 640000, /* For Sunwave CBRS RRH: 3600 MHz */
#else
band: 7,
dl_nr_arfcn: 536020, /* 2680 MHz */
ssb_subcarrier_spacing: 15,
#endif
},
], /* nr_cell_list */
nr_cell_default: {
subcarrier_spacing: 30, /* kHz */
bandwidth: BANDWIDTH, /* MHz */
n_antenna_dl: N_ANTENNA_DL,
n_antenna_ul: N_ANTENNA_UL,
/* force the timing TA offset (optional) */
// n_timing_advance_offset: 39936,
#if TDD == 1
tdd_ul_dl_config: {
pattern1: {
#if TDD_CONFIG == 1
period: 5, /* in ms */
dl_slots: 7,
dl_symbols: /* 6 */ 2,
ul_slots: 2,
ul_symbols: 2,
#elif TDD_CONFIG == 2 // Sunwave conf: Case C DDDDDDDSUU DDDDDDDSUU 6:4:4
period: 5, /* in ms */
dl_slots: 7,
dl_symbols: 6,
ul_slots: 2,
ul_symbols: 4,
#elif TDD_CONFIG == 3
period: 5, /* in ms */
dl_slots: 6,
dl_symbols: 2,
ul_slots: 3,
ul_symbols: 2,
#endif
},
},
ssb_pos_bitmap: "10000000",
#else
ssb_pos_bitmap: "1000",
#endif
ssb_period: 20, /* in ms */
n_id_cell: 500,
plmn_list: [ {
tac: 100,
plmn: "00101",
reserved: false,
nssai: [
{
sst: 1,
},
/*{
sst: 2,
},
{
sst: 3,
sd: 50,
},*/
],
},
],
/*sib_sched_list: [
{
filename: "sib2_nr.asn",
si_periodicity: 16,
},
{
filename: "sib3_nr.asn",
si_periodicity: 16,
},
{
filename: "sib4_nr.asn",
si_periodicity: 32,
},
],
sib9: {
si_periodicity: 32
},*/
si_window_length: 40,
cell_barred: false,
intra_freq_reselection: true,
q_rx_lev_min: -70,
q_qual_min: -20,
p_max: 10, /* dBm */
root_sequence_index: 1, /* PRACH root sequence index */
/* Scheduling request period (slots). */
sr_period: 40,
dmrs_type_a_pos: 2,
/* to limit the number of HARQ feedback in UL, use pdsch_harq_ack_max;
allows to workaround issues with SM-G977N for example */
//pdsch_harq_ack_max: 2,
prach: {
#if TDD == 1
prach_config_index: 160, /* format B4, subframe 9 */
msg1_subcarrier_spacing: 30, /* kHz */
#else
prach_config_index: 16, /* subframe 1 every frame */
#endif
msg1_fdm: 1,
msg1_frequency_start: 0,
zero_correlation_zone_config: 15,
preamble_received_target_power: -110, /* in dBm */
preamble_trans_max: 7,
power_ramping_step: 4, /* in dB */
ra_response_window: 20, /* in slots */
restricted_set_config: "unrestricted_set",
ra_contention_resolution_timer: 64, /* in ms */
ssb_per_prach_occasion: 1,
cb_preambles_per_ssb: 8,
},
pdcch: {
n_rb_coreset0: 48,
n_symb_coreset0: 1,
search_space0_index: 0,
dedicated_coreset: {
rb_start: -1, /* -1 to have the maximum bandwidth */
l_crb: -1, /* -1 means all the bandwidth */
duration: 1,
precoder_granularity: "sameAsREG_bundle",
},
css: {
n_candidates: [ 0, 0, 1, 0, 0 ],
},
rar_al_index: 2,
si_al_index: 2,
uss: {
n_candidates: [ 0, 2, 1, 0, 0 ],
dci_0_1_and_1_1: true,
},
al_index: 1,
},
pdsch: {
mapping_type: "typeA",
dmrs_add_pos: 1,
dmrs_type: 1,
dmrs_max_len: 1,
k0: 0, /* delay in slots from DCI to PDSCH */
/* delay in slots from PDSCH to PUCCH/PUSCH ACK/NACK */
#if TDD == 1
#if TDD_CONFIG == 1
k1: [ 8, 7, 7, 6, 5, 4, 12 /* , 11 */ ],
#elif TDD_CONFIG == 2
k1: [ 8, 7, 7, 6, 5, 4, 12, 11 ],
#elif TDD_CONFIG == 3
k1: [ 7, 6, 6, 5, 5, 4 ],
#endif
#else
k1: 4,
#endif
mcs_table: "qam256",
rar_mcs: 2,
si_mcs: 6,
/* If defined, force the PDSCH MCS for all UEs. Otherwise it is computed
* based on DL channel quality estimation */
/* mcs: 24, */
#if NR_TEST_MODE != -1
/* hardcoded scheduling parameters */
n_layer: N_ANTENNA_DL,
#if N_ANTENNA_DL >= 4
n_dmrs_cdm_groups: 2,
#else
n_dmrs_cdm_groups: 1,
#endif
/* If defined, force the PDSCH MCS for all UEs. Otherwise it is computed
* based on DL channel quality estimation */
mcs: 28,
fer: 0,
#endif
},
csi_rs: {
nzp_csi_rs_resource: [
{
csi_rs_id: 0,
#if N_ANTENNA_DL == 1
n_ports: 1,
frequency_domain_allocation: "row2",
bitmap: "100000000000",
cdm_type: "no_cdm",
#elif N_ANTENNA_DL == 2
n_ports: 2,
frequency_domain_allocation: "other",
bitmap: "100000",
cdm_type: "fd_cdm2",
#elif N_ANTENNA_DL == 4
n_ports: 4,
frequency_domain_allocation: "row4",
bitmap: "100",
cdm_type: "fd_cdm2",
#elif N_ANTENNA_DL == 8
n_ports: 8,
frequency_domain_allocation: "other",
bitmap: "110011",
cdm_type: "fd_cdm2",
#else
#error unsupported number of DL antennas
#endif
density: 1,
first_symb: 4,
rb_start: 0,
l_crb: -1, /* -1 means from rb_start to the end of the bandwidth */
power_control_offset: 0, /* dB */
power_control_offset_ss: 0, /* dB */
period: 80,
offset: 1, /* != 0 to avoid collision with SSB */
qcl_info_periodic_csi_rs: 0,
},
#define USE_TRS
#ifdef USE_TRS
/* TRS : period of 40 ms, slots 1 & 2, symbols 4 and 8 */
{
csi_rs_id: 1,
n_ports: 1,
frequency_domain_allocation: "row1",
bitmap: "0001",
cdm_type: "no_cdm",
density: 3,
first_symb: 4,
rb_start: 0,
l_crb: -1, /* -1 means from rb_start to the end of the bandwidth */
power_control_offset: 0, /* dB */
power_control_offset_ss: 0, /* dB */
period: 40,
offset: 11,
qcl_info_periodic_csi_rs: 0,
},
{
csi_rs_id: 2,
n_ports: 1,
frequency_domain_allocation: "row1",
bitmap: "0001",
cdm_type: "no_cdm",
density: 3,
first_symb: 8,
rb_start: 0,
l_crb: -1, /* -1 means from rb_start to the end of the bandwidth */
power_control_offset: 0, /* dB */
power_control_offset_ss: 0, /* dB */
period: 40,
offset: 11,
qcl_info_periodic_csi_rs: 0,
},
{
csi_rs_id: 3,
n_ports: 1,
frequency_domain_allocation: "row1",
bitmap: "0001",
cdm_type: "no_cdm",
density: 3,
first_symb: 4,
rb_start: 0,
l_crb: -1, /* -1 means from rb_start to the end of the bandwidth */
power_control_offset: 0, /* dB */
power_control_offset_ss: 0, /* dB */
period: 40,
offset: 12,
qcl_info_periodic_csi_rs: 0,
},
{
csi_rs_id: 4,
n_ports: 1,
frequency_domain_allocation: "row1",
bitmap: "0001",
cdm_type: "no_cdm",
density: 3,
first_symb: 8,
rb_start: 0,
l_crb: -1, /* -1 means from rb_start to the end of the bandwidth */
power_control_offset: 0, /* dB */
power_control_offset_ss: 0, /* dB */
period: 40,
offset: 12,
qcl_info_periodic_csi_rs: 0,
},
#endif
],
nzp_csi_rs_resource_set: [
{
csi_rs_set_id: 0,
nzp_csi_rs_resources: [ 0 ],
repetition: false,
},
#ifdef USE_TRS
{
csi_rs_set_id: 1,
nzp_csi_rs_resources: [ 1, 2, 3, 4 ],
repetition: false,
trs_info: true,
},
#endif
],
csi_im_resource: [
{
csi_im_id: 0,
pattern: 1,
subcarrier_location: 8,
symbol_location: 8,
rb_start: 0,
l_crb: -1, /* -1 means from rb_start to the end of the bandwidth */
period: 80,
offset: 1, /* != 0 to avoid collision with SSB */
},
],
csi_im_resource_set: [
{
csi_im_set_id: 0,
csi_im_resources: [ 0 ],
}
],
/* ZP CSI-RS to set the CSI-IM REs to zero */
zp_csi_rs_resource: [
{
csi_rs_id: 0,
frequency_domain_allocation: "row4",
bitmap: "100",
n_ports: 4,
cdm_type: "fd_cdm2",
first_symb: 8,
density: 1,
rb_start: 0,
l_crb: -1, /* -1 means from rb_start to the end of the bandwidth */
period: 80,
offset: 1,
},
],
p_zp_csi_rs_resource_set: [
{
zp_csi_rs_resources: [ 0 ],
},
],
csi_resource_config: [
{
csi_rsc_config_id: 0,
nzp_csi_rs_resource_set_list: [ 0 ],
resource_type: "periodic",
},
{
csi_rsc_config_id: 1,
csi_im_resource_set_list: [ 0 ],
resource_type: "periodic",
},
#ifdef USE_TRS
{
csi_rsc_config_id: 2,
nzp_csi_rs_resource_set_list: [ 1 ],
resource_type: "periodic",
},
#endif
],
csi_report_config: [
{
resources_for_channel_measurement: 0,
csi_im_resources_for_interference: 1,
report_config_type: "periodic",
period: 80,
report_quantity: "CRI_RI_PMI_CQI",
#if N_ANTENNA_DL > 1
codebook_config: {
codebook_type: "type1",
sub_type: "typeI_SinglePanel",
#if N_ANTENNA_DL == 2
#elif N_ANTENNA_DL == 4
n1: 2,
n2: 1,
codebook_mode: 1,
#elif N_ANTENNA_DL == 8
n1: 4,
n2: 1,
codebook_mode: 1,
#endif
},
#endif
cqi_table: 2,
subband_size: "value1",
},
],
},
pucch: {
pucch_group_hopping: "neither",
hopping_id: -1, /* -1 = n_cell_id */
p0_nominal: -90,
#if 0
pucch0: {
initial_cyclic_shift: 1,
n_symb: 1,
},
#else
pucch1: {
n_cs: 3,
n_occ: 3,
freq_hopping: true,
},
#endif
#if 1
pucch2: {
n_symb: 2,
n_prb: 1,
freq_hopping: true,
simultaneous_harq_ack_csi: false,
max_code_rate: 0.25,
},
#endif
#if 0
pucch3: {
bpsk: false,
additional_dmrs: false,
freq_hopping: true,
n_prb: 1,
simultaneous_harq_ack_csi: false,
max_code_rate: 0.25,
},
#endif
#if 0
pucch4: {
occ_len: 4,
bpsk: false,
additional_dmrs: false,
freq_hopping: true,
simultaneous_harq_ack_csi: false,
max_code_rate: 0.25,
},
#endif
},
#if USE_SRS
srs: {
#if TDD_CONFIG == 1 || TDD_CONFIG == 2
srs_symbols: [ 0, 0, 0, 0, 0, 0, 0, 2, 0, 0 ],
#elif TDD_CONFIG == 3
srs_symbols: [ 0, 0, 0, 0, 0, 0, 2, 0, 0, 0 ],
#endif
srs_resource: [
{
srs_resource_id: 0,
n_ports: N_ANTENNA_UL,
resource_type: "periodic",
period: 80, /* in slots */
}
],
srs_resource_set: [
{
srs_resource_id_list: [ 0 ],
},
],
},
#endif
pusch: {
mapping_type: "typeA",
n_symb: 14,
dmrs_add_pos: 1,
dmrs_type: 1,
dmrs_max_len: 1,
tf_precoding: false,
mcs_table: "qam256", /* without transform precoding */
mcs_table_tp: "qam256", /* with transform precoding */
ldpc_max_its: 5,
k2: 4, /* delay in slots from DCI to PUSCH */
p0_nominal_with_grant: -76,
msg3_k2: 7,
msg3_mcs: 4,
msg3_delta_power: 0, /* in dB */
beta_offset_ack_index: 9,
/* if defined, force the PUSCH MCS for all UEs. Otherwise it is
computed from the last received PUSCH. */
/* mcs: 16, */
},
/* MAC configuration */
mac_config: {
msg3_max_harq_tx: 5,
ul_max_harq_tx: 5, /* max number of HARQ transmissions for uplink */
dl_max_harq_tx: 5, /* max number of HARQ transmissions for downlink */
ul_max_consecutive_retx: 30, /* disconnect UE if reached */
dl_max_consecutive_retx: 30, /* disconnect UE if reached */
periodic_bsr_timer: 20,
retx_bsr_timer: 320,
periodic_phr_timer: 500,
prohibit_phr_timer: 200,
phr_tx_power_factor_change: "dB3",
sr_prohibit_timer: 0, /* in ms, 0 to disable the timer */
sr_trans_max: 64,
},
cipher_algo_pref: [],
integ_algo_pref: [2, 1],
inactivity_timer: 10000,
drb_config: "drb_nr.cfg",
#if NR_TEST_MODE != -1
#if NR_TEST_MODE == 0
test_mode: {
type: "pdsch",
rnti: 0x100,
pdsch_retx: 0,
},
#elif NR_TEST_MODE == 1
test_mode: {
type: "pusch",
rnti: 0x100,
pusch_retx: 0,
},
#else
test_mode: {
type: "load",
ue_count: UE_COUNT,
},
#endif
#endif
},
}
import subprocess
import struct
import random
import os
import hashlib
def _hash(x):
return hashlib.sha256(repr(x).encode()).hexdigest()[:8]
def print_hash(name, x):
print("{:<32}: {}".format(name, _hash(x)))
def parse_binary(b):
return ''.join(map(lambda x:format(int(x), '0>2x'), b))
def parse_frame(buf):
c = [0]
def b(c, n):
ret = buf[c[0]:c[0]+n]
c[0] += n
return ret
dst_mac = b(c, 6)
src_mac = b(c, 6)
ether_type = b(c, 2)
ecpri_common_header = b(c, 4)
pc_id = b(c, 2)
seq_id = b(c, 2)
iq_samples = b(c, 240)
avg = sum(map(lambda x:int(x), iq_samples))
s = '''
dst_mac, src_mac, ether_type: {}, {}, {}
ecpri_common_header: {}
pc_id, seq_id: {}, {}
IQ samples: {}'''.format(
parse_binary(dst_mac),
parse_binary(src_mac),
parse_binary(ether_type),
parse_binary(ecpri_common_header),
parse_binary(pc_id),
parse_binary(seq_id),
parse_binary(iq_samples))[1:]
return (avg == 0, s)
# Parse RX/TX frame
def read_trace(name, n):
log_directory = '/root/ecpri-logs'
file_name = '{}/{}'.format(log_directory, name)
n = min(n * 262, os.path.getsize(file_name))
f = open(file_name, "rb")
data = f.read(n)
f.close()
print(name + ", frame number = " + str(n / 262))
return data
def print_frame(data):
n = len(data)
frame_len = 262
for i in range(int(n / frame_len)):
empty, s = parse_frame(data[i * frame_len:(i+1) * frame_len])
if empty:
continue
print("Frame example:\n\n{}\n".format(s))
break
def analyze_tdd(data):
null_frames = 0
tdd_switch_list = []
prev_empty = False
prev_i = 0
n = len(data)
frame_len = 262
for i in range(int(n / frame_len)):
empty, s = parse_frame(data[i * frame_len:(i+1) * frame_len])
if i > 0 and empty != prev_empty:
tdd_switch_list.append(i-prev_i)
prev_i = i
null_frames += empty
prev_empty = empty
tdd_switch_list.append(i-prev_i)
print('Frame analyzed: ' + str(i))
total_frames = int(n / frame_len)
ratio = total_frames / null_frames if null_frames > 0 else 'inf'
print('TDD ratio: {}\n'.format(ratio))
print('TDD switch list: ' + ', '.join(map(str, tdd_switch_list)) + '\n')
def print_frame_list(data, start, end):
n = len(data)
frame_len = 262
end = int(n / frame_len) if end == -1 else end
end = min(int(n / frame_len), end)
for i in range(start, end):
empty, s = parse_frame(data[i * frame_len:(i+1) * frame_len])
print(s)
print('')
def print_iq_list(data, start, end, tx=False):
n = len(data)
frame_len = 64 * 4
end = int(n / frame_len) if end == -1 else end
end = min(int(n / frame_len), end)
for i in range(start * 4, end * 4, 4):
iq = data[i * frame_len:(i+1) * frame_len]
iq_packed = []
k = 0
h = _hash(iq)
prev_x = 2
for x in map(lambda x: int(x != 0), iq):
if x == prev_x:
iq_packed[-1][0] += 1
else:
iq_packed.append([1, str(x) * 2])
prev_x = x
print(h + " " + " ".join(map(lambda x: "{}*{}".format(*x), iq_packed)))
# Parse RX/TX frame
def read_trx_trace(name, n, channels):
log_directory = '/root/ecpri-logs'
file_name = '{}/{}'.format(log_directory, name)
n = min(n * (4 * channels * 64), os.path.getsize(file_name))
f = open(file_name, "rb")
data = f.read(n)
f.close()
print(name + ", frame number = " + str(n / (4 * channels * 64)))
return data
def analyze_trx_tdd(data, channels):
null_frames = 0
tdd_switch_list = []
prev_empty = False
prev_i = 0
n = len(data)
frame_len = channels * 4 * 64
max_iq_sample = 0
min_iq_sample = 0
avg_iq_sample = 0
for i in range(int(n / frame_len)):
iq_samples = data[i * frame_len:(i+1) * frame_len]
iq_samples = [struct.unpack('f', iq_samples[4*j:4*(j+1)])[0] for j in range(int(len(iq_samples) / 4))]
iq_samples_abs_avg = sum(map(abs, iq_samples))
max_iq_sample = max(max_iq_sample, max(iq_samples))
min_iq_sample = min(min_iq_sample, min(iq_samples))
avg_iq_sample += iq_samples_abs_avg
empty = iq_samples_abs_avg < 0.1
if i > 0 and empty != prev_empty:
tdd_switch_list.append(i-prev_i)
prev_i = i
null_frames += empty
prev_empty = empty
tdd_switch_list.append(i-prev_i)
avg_iq_sample /= (n / frame_len) * (64 * channels)
print('Max IQ sample: {}, Min IQ sample: {}'.format(max_iq_sample, min_iq_sample))
print('Avg IQ sample: {}'.format(avg_iq_sample))
print('Frame analyzed: ' + str(i))
total_frames = int(n / frame_len)
ratio = total_frames / null_frames if null_frames > 0 else 'inf'
print('TDD ratio: {}\n'.format(ratio))
print('TDD switch list: ' + ', '.join(map(str, tdd_switch_list)) + '\n')
BF1_PATH="../bf1/bf1"
def float_to_byte(float_list):
iq_samples_bytes = bytearray(64 * 4)
for i in range(64):
b = struct.pack('f', float_list[i])
for j in range(4):
iq_samples_bytes[i * 4 + j] = b[j]
return bytes(iq_samples_bytes)
def byte_to_float(b):
float_list = []
for i in range(64):
x = struct.unpack('f', b[4 * i:4 * (i+1)])[0]
float_list.append(x)
return float_list
def compress(data):
s = subprocess.run(
[BF1_PATH, "c"],
input=data,
capture_output=True,
)
return s.stdout
def decompress(data):
s = subprocess.run([BF1_PATH, "d"],
input=data,
capture_output=True,
)
return s.stdout
def open_all(n):
return (
read_trace('tx.trace', n),
read_trace('rx.trace', n),
read_trx_trace('trxw.trace', n, 4),
read_trx_trace('trxr.trace', n, 1),
)
def analyze_all_tdd(n=0, data=None):
if data:
tx_data, rx_data, trxw_data, trxr_data = data
print("TX")
analyze_tdd(tx_data)
print("RX")
analyze_tdd(rx_data)
print("TRXW")
analyze_trx_tdd(trxw_data, 4)
print("TRXR")
analyze_trx_tdd(trxr_data, 1)
else:
data = read_trace('rx.trace', n)
analyze_tdd(data)
data = read_trace('tx.trace', n)
analyze_tdd(data)
data = read_trx_trace('trxw.trace', n, 4)
analyze_trx_tdd(data, 4)
data = read_trx_trace('trxr.trace', n, 1)
analyze_trx_tdd(data, 1)
def check_tx_trxw(n=0, data=None):
if data:
tx_data, rx_data, trxw_data, trxr_data = data
else:
tx_data = read_trace('tx.trace', n)
trxw_data = read_trx_trace('trxw.trace', n, 4)
for i in range(int(min(len(tx_data) / 262, len(trxw_data) / (64 * 4 * 4)))):
for j in range(1):
k = i * 4 + j
tx_iq = tx_data[(i * 262 + 22 + j * 60):(i * 262 + 22 + (j+1) * 60)]
trxw_iq = trxw_data[(k * (64 * 4)):((k+1) * (64 * 4))]
tx_iq_d = decompress(tx_iq)
trxw_iq_d = compress(trxw_iq)
if (_hash(tx_iq) != _hash(trxw_iq_d)) or (_hash(trxw_iq) != _hash(tx_iq_d)):
print((i,j))
print_hash("TX", tx_iq)
print_hash("TRX compressed", trxw_iq_d)
print_hash("TRX", trxw_iq)
print_hash("TX decompressed", tx_iq_d)
def print_all(n=0, data=None):
if data:
tx_data, rx_data, trxw_data, trxr_data = data
print('TX_DATA')
print_frame_list(tx_data, 0, 10)
print('RX_DATA')
print_frame_list(rx_data, 0, 10)
print('TRXW_DATA')
print('TRXR_DATA')
print_iq_list(trxr_data, 0, 10)
data = open_all(100000)
#check_tx_trxw(data=data)
#print_all(data=data)
analyze_all_tdd(data=data)
export PTP_INSTALL="/root/linuxptp"
export INTERFACE="ens9f0"
export DPDK_INSTALL="/root/dpdk-21.11"
export LTEENB="/root/enb/lteenb"
export ENB_CONFIG="enb.cfg"
export LOG_DIRECTORY="/root/ecpri-logs"
export LD_LIBRARY_PATH="/root/ecpri-priv:/root/enb:$LD_LIBRARY_PATH"
export AMARISOFT_PATH="/root/.amarisoft"
export PTP_INSTALL="/root/linuxptp"
export INTERFACE="ens9f0"
export DPDK_INSTALL="/root/dpdk-stable-20.11.3"
export LTEENB="/root/enb/lteenb"
export ENB_CONFIG="enb-hfr.cfg"
export LOG_DIRECTORY="/home/hfr/ecpri-logs"
export LD_LIBRARY_PATH="/root/ecpri-priv:/root/enb:$LD_LIBRARY_PATH"
export AMARISOFT_PATH="/root/.amarisoft"
...@@ -2,16 +2,7 @@ ...@@ -2,16 +2,7 @@
DIR=$(dirname $(realpath $0)) DIR=$(dirname $(realpath $0))
#HFR
#DPDK_INSTALL=/root/dpdk-stable-20.11.3
#Tiogapass004
DPDK_INSTALL=/root/dpdk-21.11
cd $DPDK_INSTALL/usertools; cd $DPDK_INSTALL/usertools;
python3 dpdk-hugepages.py --setup 2G; python3 dpdk-hugepages.py --setup 2G;
cd $DIR/..; cd $DIR/.. && make;
make; ln -sf $DIR/../libtrx_ecpri_dpdk.so $(dirname $LTEENB)/trx_ecpri.so
cd test-eNB;
make all;
ln -sf /root/ecpri-priv/libtrx_ecpri_dpdk.so /root/enb/trx_ecpri.so
ln -sf /root/ecpri-priv/enb.cfg /root/enb/config/enb.cfg
# Tiogapass004 test chrt -f 97 taskset -c 39 phc2sys -m -c $INTERFACE -s CLOCK_REALTIME -O0 -f $PTP_INSTALL/configs/G.8275.1.cfg
#chrt -f 97 taskset -c 2 phc2sys -m -c ens9f1 -s CLOCK_REALTIME -O0 -f $HOME/linuxptp/configs/G.8275.1.cfg
#chrt -f 97 taskset -c 2 phc2sys -m -s ens9f1 -c CLOCK_REALTIME -O0 -f $HOME/linuxptp/configs/G.8275.1.cfg
# HFR Switch
chrt -f 97 taskset -c 39 phc2sys -m -c ens9f0 -s CLOCK_REALTIME -O0 -f $HOME/linuxptp/configs/G.8275.1.cfg
# Tiogapass003 test chrt -f 97 taskset -c 38 ptp4l -H -i $INTERFACE -m -f $PTP_INSTALL/configs/G.8275.1.cfg
#chrt -f 97 taskset -c 38 ptp4l -H -i ens9f1 -m -f $HOME/linuxptp/configs/G.8275.1.cfg
# HFR Switch
chrt -f 97 taskset -c 38 ptp4l -H -i ens9f0 -m -f $HOME/linuxptp/configs/G.8275.1.cfg
#!/bin/bash #!/bin/bash
/root/stop-cpri.sh
DIR=$(dirname $(realpath $0))
$DIR/stop-cpri.sh
systemctl start lte systemctl start lte
...@@ -2,23 +2,13 @@ ...@@ -2,23 +2,13 @@
DIR=$(dirname $(realpath $0)) DIR=$(dirname $(realpath $0))
#HFR
#LOGD=/home/hfr
#Tiogapass004
LOGD=/root/ecpri-logs
systemctl stop lte; systemctl stop lte;
$DIR/stop-ecpri.sh; $DIR/stop-ecpri.sh;
cd $DIR/..; cd $DIR/..;
#make clean;
make; make;
cd test-eNB;
make all;
export LD_LIBRARY_PATH="/root/ecpri-priv:/root/enb:$LD_LIBRARY_PATH" #$DIR/launch-ptp > $LOG_DIRECTORY/ptp.log 2> $LOG_DIRECTORY/ptp.error &
export AMARISOFT_PATH="/root/.amarisoft" #$DIR/launch-phc2sys > $LOG_DIRECTORY/phc2sys.log 2> $LOG_DIRECTORY/phc2sys.error &
cd $DIR/../test-eNB; $LTEENB $DIR/../enb-configs/$ENB_CONFIG
$DIR/../launch-ptp > $LOGD/ptp.log 2> $LOGD/ptp.error &
$DIR/../launch-phc2sys > $LOGD/phc2sys.log 2> $LOGD/phc2sys.error &
#./test-dpdk-ecpri > $LOGD/ecpri.log 2> $LOGD/ecpri.error &
~/enb/lteenb ~/enb/config/enb.cfg
...@@ -38,12 +38,11 @@ typedef struct { ...@@ -38,12 +38,11 @@ typedef struct {
const char * rec_mac; const char * rec_mac;
const char * rec_if; const char * rec_if;
const char * dpdk_options; const char * dpdk_options;
const char * trace_file; const char * log_directory;
const char * stats_file;
int recv_affinity; int recv_affinity;
int send_affinity; int send_affinity;
int prepare_affinity; int encode_affinity;
int decompress_affinity; int decode_affinity;
int statistic_affinity; int statistic_affinity;
int ecpri_period; int ecpri_period;
int flow_id; int flow_id;
...@@ -109,7 +108,8 @@ static int64_t calcdiff_ns(struct timespec t1, struct timespec t2) { ...@@ -109,7 +108,8 @@ static int64_t calcdiff_ns(struct timespec t1, struct timespec t2) {
} }
TRXState s1; TRXState s1;
float ** tx_samples; float ** tx_samples_zeroes;
float ** tx_samples_ones;
float ** rx_samples; float ** rx_samples;
void dummy_enb_init(TRXState *s1, TRXEcpriState *s); void dummy_enb_init(TRXState *s1, TRXEcpriState *s);
...@@ -174,7 +174,7 @@ int main(int argc, char * argv[]) { ...@@ -174,7 +174,7 @@ int main(int argc, char * argv[]) {
s->dpdk_options = "-l 10,20 -b 0000:04:00.0 -b 0000:3b:00.0 -b 0000:3b:00.1 -b 0000:5e:00.1 "; s->dpdk_options = "-l 10,20 -b 0000:04:00.0 -b 0000:3b:00.0 -b 0000:3b:00.1 -b 0000:5e:00.1 ";
#endif #endif
#if 1 #if 0
// tiogapass-004 MT27710 port1 // tiogapass-004 MT27710 port1
s->rec_mac = "b8:59:9f:07:86:43"; s->rec_mac = "b8:59:9f:07:86:43";
//s->re_mac = "04:09:a5:0f:9f:4c"; // Lille M6424 Switch //s->re_mac = "04:09:a5:0f:9f:4c"; // Lille M6424 Switch
...@@ -192,18 +192,33 @@ int main(int argc, char * argv[]) { ...@@ -192,18 +192,33 @@ int main(int argc, char * argv[]) {
s->dpdk_options = "-l 10,20 -b 0000:04:00.0 -b 0000:3b:00.1 -b 0000:5e:00.0 -b 0000:5e:00.1 "; s->dpdk_options = "-l 10,20 -b 0000:04:00.0 -b 0000:3b:00.1 -b 0000:5e:00.0 -b 0000:5e:00.1 ";
#endif #endif
//s->recv_affinity = 39;
//s->send_affinity = 38;
//s->prepare_affinity = 37;
//s->decompress_affinity = 36;
//s->statistic_affinity = 35;
//s->ecpri_period = 800;
//s->flow_id = 0;
//s->sample_rate = 122880000;
//s->trace_file = "/root/ecpri-logs/rx.trace";
//s->stats_file = "/root/ecpri-logs/ecpri.stats";
//s->trace_period = 1000000;
// enb.cfg
s->rec_mac = "b8:59:9f:07:86:42";
s->re_mac = "04:09:a5:0f:9f:4a";
s->rec_if = "ens9f0";
s->dpdk_options = "-l 10,20 -b 0000:04:00.0 -b 0000:3b:00.0 -b 0000:3b:00.1 -b 0000:5e:00.1 ";
s->recv_affinity = 39; s->recv_affinity = 39;
s->send_affinity = 38; s->send_affinity = 38;
s->prepare_affinity = 37; s->encode_affinity = 37;
s->decompress_affinity = 36; s->decode_affinity = 36;
s->statistic_affinity = 35; s->statistic_affinity = 35;
s->ecpri_period = 800; s->ecpri_period = 800;
s->flow_id = 0; s->flow_id = 0;
s->sample_rate = 122880000; s->trace_period = 10000000;
s->log_directory = "/root/ecpri-logs";
s->trace_file = "/root/ecpri-logs/rx.trace";
s->stats_file = "/root/ecpri-logs/ecpri.stats";
s->trace_period = 1000000;
log_info("TEST-DPDK-ECPRI", "Starting test...\n"); log_info("TEST-DPDK-ECPRI", "Starting test...\n");
...@@ -217,17 +232,21 @@ int main(int argc, char * argv[]) { ...@@ -217,17 +232,21 @@ int main(int argc, char * argv[]) {
static void enb(TRXState * s1, TRXEcpriState * s) { static void enb(TRXState * s1, TRXEcpriState * s) {
struct timespec next; struct timespec next;
trx_timestamp_t ptimestamp; trx_timestamp_t ptimestamp;
int64_t p = 1000000 * 100; int64_t p = 100000 * 10;
int m = 1; int m = 1;
int64_t first_rx_ts;
clock_gettime(CLOCK_TAI, &next); clock_gettime(CLOCK_TAI, &next);
tx_samples = (float**) malloc(sizeof(float*) * 4); tx_samples_zeroes = (float**) malloc(sizeof(float*) * 4);
tx_samples_ones = (float**) malloc(sizeof(float*) * 4);
rx_samples = (float**) malloc(sizeof(float*) * 4); rx_samples = (float**) malloc(sizeof(float*) * 4);
for(int i = 0; i < 4; i++) { for(int i = 0; i < 4; i++) {
tx_samples[i] = (float*) malloc(sizeof(float) * 65536); tx_samples_zeroes[i] = (float*) malloc(sizeof(float) * 65536);
tx_samples_ones[i] = (float*) malloc(sizeof(float) * 65536);
rx_samples[i] = (float*) malloc(sizeof(float) * 65536); rx_samples[i] = (float*) malloc(sizeof(float) * 65536);
for(int j = 0; j < 65536; j++) { for(int j = 0; j < 65536; j++) {
tx_samples[i][j] = i * j; tx_samples_zeroes[i][j] = 0.0f;
tx_samples_ones[i][j] = 1.0f;
} }
} }
...@@ -235,12 +254,12 @@ static void enb(TRXState * s1, TRXEcpriState * s) { ...@@ -235,12 +254,12 @@ static void enb(TRXState * s1, TRXEcpriState * s) {
int64_t tx_timestamp = 256 * (INT64_C(3840000) * ((int64_t) p * i + p)) / (INT64_C(1000000000)); int64_t tx_timestamp = 256 * (INT64_C(3840000) * ((int64_t) p * i + p)) / (INT64_C(1000000000));
add_ns(&next, p); add_ns(&next, p);
//log_info("TEST-DPDK-ECPRI", "Reading 256 samples"); s1->trx_read_func2(s1, &ptimestamp, rx_samples, 32, 0, NULL);
s1->trx_read_func2(s1, &ptimestamp, rx_samples, 256 * m, 0, NULL); if(i == 0)
//log_info("TEST-DPDK-ECPRI", "Writing 256 samples, %" PRIi64, tx_timestamp); first_rx_ts = ptimestamp;
s1->trx_write_func2(s1, tx_timestamp, tx_samples, 256 * m, 0, NULL); s1->trx_write_func2(s1, first_rx_ts + 2*i*32, tx_samples_zeroes, 32, 0, NULL);
s1->trx_write_func2(s1, tx_timestamp + 256 * m + 100, tx_samples, 256 * m, 0, NULL); s1->trx_read_func2(s1, &ptimestamp, rx_samples, 32, 0, NULL);
s1->trx_write_func2(s1, tx_timestamp + 2 * 256 * m + 110, tx_samples, 256 * m, 0, NULL); s1->trx_write_func2(s1, first_rx_ts + (2*i + 1)*32, tx_samples_ones, 32, 0, NULL);
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL); clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
} }
......
...@@ -47,147 +47,48 @@ ...@@ -47,147 +47,48 @@
#define SSE4 // define if CPU supports SSE4.1 #define SSE4 // define if CPU supports SSE4.1
#define DST_ADDR_SYNTAX // Depends on DPDK version #define DST_ADDR_SYNTAX // Depends on DPDK version
//#define SEND_LIMIT (100000)
//#define TRACE
/* Proprietary code: /* Proprietary code:
- compression / decompression of IQ samples - compression / decompression of IQ samples
- fast conversion between int16_t and float - fast conversion between int16_t and float
*/ */
#include "private.c" #include "private.c"
#include "utils.c"
/* eCPRI Send and Recv */ /* eCPRI Send and Recv */
#define N_SAMPLES 256 #define TX_N_SAMPLES 256
#define PACKET_SIZE 262 #define RX_N_SAMPLES 64
#define DATA_SIZE 248 #define TX_PACKET_SIZE 262
#define RX_MAX_PACKET_SIZE 262
#define TX_ECPRI_PACKET_SIZE (TX_PACKET_SIZE - 14)
#define RX_N_CHANNEL 1
#define TX_N_CHANNEL 4
#define FRAME_FREQ INT64_C(3840000) // Basic frame frequency #define FRAME_FREQ INT64_C(3840000) // Basic frame frequency
//#define SEND_LIMIT (1250 * 10)
#define TRX_WB_MAX_PARTS 1000 #define TRX_WB_MAX_PARTS 1000
#define TRX_BUF_MAX_SIZE 1000 #define TRX_BUF_MAX_SIZE 20000
#define STATISTIC_REFRESH_RATE INT64_C(500 * 1000 * 1000) #define STATISTIC_REFRESH_RATE INT64_C(500 * 1000 * 1000)
#define TRACE_BUFFER_SIZE_MB 200
static void log_error(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " ERROR [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
puts(line);
exit(EXIT_FAILURE);
}
static volatile int64_t limit_counter = 0;
static inline void log_limit(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
if(limit_counter++ % 1000000)
return;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " DEBUG [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
puts(line);
}
static void log_info(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " INFO [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
puts(line);
}
#ifdef DEBUG
static void log_debug(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " DEBUG [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
puts(line);
}
#else
#define log_debug(...)
#endif
static int latency_target_fd = -1;
static int32_t latency_target_value = 0;
/* Latency trick
* if the file /dev/cpu_dma_latency exists,
* open it and write a zero into it. This will tell
* the power management system not to transition to
* a high cstate (in fact, the system acts like idle=poll)
* When the fd to /dev/cpu_dma_latency is closed, the behavior
* goes back to the system default.
*
* Documentation/power/pm_qos_interface.txt
*/
void set_latency_target(void) {
struct stat s;
int err;
errno = 0;
err = stat("/dev/cpu_dma_latency", &s);
if (err == -1) {
error(EXIT_FAILURE, errno, "WARN: stat /dev/cpu_dma_latency failed");
return;
}
errno = 0;
latency_target_fd = open("/dev/cpu_dma_latency", O_RDWR);
if (latency_target_fd == -1) {
error(EXIT_FAILURE, errno, "WARN: open /dev/cpu_dma_latency");
return;
}
errno = 0;
err = write(latency_target_fd, &latency_target_value, 4);
if (err < 1) {
error(EXIT_FAILURE, errno, "# error setting cpu_dma_latency to %d!",
latency_target_value);
close(latency_target_fd);
return;
}
printf("# /dev/cpu_dma_latency set to %dus\n", latency_target_value);
}
typedef struct { typedef struct {
volatile void * buffer; const uint8_t * re_mac;
char name[64]; const uint8_t * rec_mac;
size_t buf_len; const uint8_t * rec_if;
size_t len; const uint8_t * dpdk_options;
volatile int write_index; const uint8_t * log_directory;
volatile int read_index; int recv_affinity;
} ring_buffer_t; int send_affinity;
int encode_affinity;
int decode_affinity;
int statistic_affinity;
int ecpri_period;
int flow_id;
int sample_rate;
int trace_period;
} TRXEcpriState;
typedef struct { typedef struct {
int64_t counter; int64_t counter;
...@@ -197,22 +98,36 @@ typedef struct { ...@@ -197,22 +98,36 @@ typedef struct {
} counter_stat_t; } counter_stat_t;
typedef struct { typedef struct {
const char * re_mac; volatile void * buffer;
const char * rec_mac; uint8_t name[64];
const char * rec_if; int buf_len;
const char * dpdk_options; int len;
const char * trace_file; volatile int write_index;
const char * stats_file; volatile int read_index;
int recv_affinity; volatile int write_ahead;
int send_affinity; pthread_mutex_t ahead_mutex;
int prepare_affinity; } ring_buffer_t;
int decompress_affinity;
int statistic_affinity; #ifdef TRACE
int ecpri_period; typedef struct {
int flow_id; int64_t size;
int sample_rate; volatile int64_t counter;
int trace_period; volatile uint8_t * buffer;
} TRXEcpriState; } buffer_t;
static void init_buffer(buffer_t * buffer, int64_t size) {
buffer->size = size;
buffer->buffer = (uint8_t *) malloc(size);
buffer->counter = 0;
memset(buffer->buffer, 0, size);
}
static void write_buffer(buffer_t * buffer, int i, uint8_t * source, int64_t len) {
if(buffer->counter + len >= buffer->size)
return;
memcpy((uint8_t*) (buffer->buffer + i + buffer->counter), source, len);
buffer->counter += len + i;
}
#endif
// Buffers // Buffers
static ring_buffer_t rx_rbuf; // Received packets static ring_buffer_t rx_rbuf; // Received packets
...@@ -222,71 +137,67 @@ static ring_buffer_t trx_write_rbuf; // Uncompressed IQ samples ...@@ -222,71 +137,67 @@ static ring_buffer_t trx_write_rbuf; // Uncompressed IQ samples
// List of timestamps at which data should be sent // List of timestamps at which data should be sent
static volatile int64_t trx_wb_ts[TRX_WB_MAX_PARTS]; static volatile int64_t trx_wb_ts[TRX_WB_MAX_PARTS];
// List of corresponding indexes in trx_write_rbuf // List of corresponding indexes in trx_write_rbuf
static volatile int trx_wb_part[TRX_WB_MAX_PARTS]; // TODO write next index instead of current static volatile int trx_wb_part[TRX_WB_MAX_PARTS];
static int trx_wb_part_read_index; static int trx_wb_part_read_index;
static int trx_wb_part_write_index; static int trx_wb_part_write_index;
// Locks // Locks
pthread_mutex_t tx_mutex; pthread_mutex_t tx_mutex;
pthread_cond_t tx_cond; pthread_cond_t tx_cond;
pthread_mutex_t encode_mutex;
pthread_cond_t encode_cond;
pthread_mutex_t rx_mutex; pthread_mutex_t rx_mutex;
pthread_cond_t rx_cond; pthread_cond_t rx_cond;
pthread_mutex_t tx_ready_mutex; pthread_mutex_t decode_mutex;
pthread_cond_t tx_ready_cond; pthread_cond_t decode_cond;
pthread_mutex_t trx_write_mutex; pthread_mutex_t trx_write_mutex;
pthread_cond_t trx_write_cond; pthread_cond_t trx_write_cond;
sem_t trx_read_sem; sem_t trx_read_sem;
// Counters // Counters
static volatile counter_stat_t prepared_counter; // compressed samples static volatile counter_stat_t recv_counter; // frames received from eRE
static volatile counter_stat_t decode_counter; // decoded frames
static volatile counter_stat_t read_counter; // frames passed to amarisoft stack static volatile counter_stat_t read_counter; // frames passed to amarisoft stack
static volatile counter_stat_t write_counter; // samples to write from TRX
static volatile counter_stat_t encode_counter; // compressed samples
static volatile counter_stat_t sent_counter; // frames sent to eRE static volatile counter_stat_t sent_counter; // frames sent to eRE
static volatile counter_stat_t recv_counter; // frames received from eRE
#define STAT_FRAME_INTERVAL INT64_C(3800000) #define STAT_FRAME_INTERVAL INT64_C(3800000)
static volatile int sync_complete = 0;
static volatile uint8_t iq_frame_full[1024];
static volatile uint8_t iq_frame_empty[1024];
// Computed values // Computed values
static int rxtx_buf_size; static int rxtx_buf_size;
static int ecpri_period_mult; static int ecpri_period_mult;
// Network // Network
static volatile int seq_id; static volatile int seq_id;
// Timestamps utils #ifdef TRACE
#define NSEC_PER_SEC INT64_C(1000000000) static buffer_t tx_trace_buffer;
static struct timespec int_to_ts(int64_t t) { static buffer_t rx_trace_buffer;
struct timespec ts; static buffer_t trxw_trace_buffer;
ts.tv_sec = t / NSEC_PER_SEC; static buffer_t trxr_trace_buffer;
ts.tv_nsec = t - (ts.tv_sec * NSEC_PER_SEC); #endif
return ts;
}
static int64_t ts_to_int(struct timespec ts) {
return ts.tv_sec * NSEC_PER_SEC + ts.tv_nsec;
}
static void add_ns(struct timespec *t, int64_t ns) {
t->tv_nsec += ns;
while (t->tv_nsec >= ((int64_t)NSEC_PER_SEC)) {
t->tv_sec += 1;
t->tv_nsec -= NSEC_PER_SEC;
}
}
static int64_t calcdiff_ns(struct timespec t1, struct timespec t2) {
int64_t diff;
diff = NSEC_PER_SEC * ((int)t1.tv_sec - (int)t2.tv_sec);
diff += ((int)t1.tv_nsec - (int)t2.tv_nsec);
return diff;
}
static void rbuf_update_write_index(ring_buffer_t * rbuf) { static void rbuf_update_write_index(ring_buffer_t * rbuf) {
rbuf->write_index = (rbuf->write_index + 1) % rbuf->buf_len; rbuf->write_index = (rbuf->write_index + 1) % rbuf->buf_len;
} }
static void rbuf_update_read_index(ring_buffer_t * rbuf) { static void rbuf_update_read_index(ring_buffer_t * rbuf) {
rbuf->read_index = (rbuf->read_index + 1) % rbuf->buf_len; rbuf->read_index = (rbuf->read_index + 1) % rbuf->buf_len;
} }
static int rbuf_read_amount(const ring_buffer_t * rbuf) { static int rbuf_read_amount(ring_buffer_t * rbuf) {
return (rbuf->write_index + rbuf->buf_len - rbuf->read_index) % rbuf->buf_len; return (rbuf->write_index + rbuf->buf_len - rbuf->read_index) % rbuf->buf_len;
} }
static int rbuf_write_amount(const ring_buffer_t * rbuf) { static int rbuf_write_amount(ring_buffer_t * rbuf) {
return (rbuf->read_index + rbuf->buf_len - rbuf->write_index) % rbuf->buf_len; // Don't write everything to avoid write index catching up to read index
// That we way we don't have to use locks
return ((rbuf->read_index + rbuf->buf_len - rbuf->write_index - 1) % rbuf->buf_len);
} }
#define RBUF_READ(rbuf, type) (((type *) rbuf.buffer) + (rbuf.read_index * rbuf.len)) #define RBUF_READ0(rbuf, type) (((type *) rbuf.buffer) + (rbuf.read_index * rbuf.len))
#define RBUF_WRITE(rbuf, type) (((type *) rbuf.buffer) + (rbuf.write_index * rbuf.len)) #define RBUF_WRITE0(rbuf, type) (((type *) rbuf.buffer) + (rbuf.write_index * rbuf.len))
#define RBUF_READ(rbuf, i, type) (((type *) rbuf.buffer) + (((rbuf.read_index + i) % rbuf.buf_len) * rbuf.len))
#define RBUF_WRITE(rbuf, i, type) (((type *) rbuf.buffer) + (((rbuf.write_index + i) % rbuf.buf_len) * rbuf.len))
#define RBUF_INIT(rbuf, _name, _buf_len, _len, type) do\ #define RBUF_INIT(rbuf, _name, _buf_len, _len, type) do\
{\ {\
log_debug("TRX_ECPRI", "Allocating %s with %d bytes\n", _name, (_buf_len * _len * sizeof(type)));\ log_debug("TRX_ECPRI", "Allocating %s with %d bytes\n", _name, (_buf_len * _len * sizeof(type)));\
...@@ -296,76 +207,12 @@ static int rbuf_write_amount(const ring_buffer_t * rbuf) { ...@@ -296,76 +207,12 @@ static int rbuf_write_amount(const ring_buffer_t * rbuf) {
rbuf.len = _len;\ rbuf.len = _len;\
rbuf.write_index = 0;\ rbuf.write_index = 0;\
rbuf.read_index = 0;\ rbuf.read_index = 0;\
rbuf.write_ahead = 0;\
pthread_mutex_init(&rbuf.ahead_mutex, NULL);\
} while(0) } while(0)
/* DPDK */ #include "dpdk.c"
#define BURST_SIZE 16
#define MEMPOOL_CACHE_SIZE 256
#define RTE_TEST_RX_DESC_DEFAULT 1024
#define RTE_TEST_TX_DESC_DEFAULT 1024
static uint16_t nb_rxd = RTE_TEST_RX_DESC_DEFAULT;
static uint16_t nb_txd = RTE_TEST_TX_DESC_DEFAULT;
struct rte_mempool *mbuf_pool;
struct rte_ether_addr s_addr;
struct rte_ether_addr d_addr;
int8_t data[BURST_SIZE][PACKET_SIZE];
static const struct rte_eth_conf port_conf_default = {
.rxmode = { .max_lro_pkt_size = RTE_ETHER_MAX_LEN }
};
static inline int port_init(int portid, struct rte_mempool *mbuf_pool) {
struct rte_eth_conf port_conf = port_conf_default;
const uint16_t rx_rings = 1, tx_rings = 1;
int retval;
uint16_t q;
retval = rte_eth_dev_configure(portid, rx_rings, tx_rings, &port_conf);
if (retval != 0)
return retval;
/* Allocate and set up 1 RX queue per Ethernet port. */
for (q = 0; q < rx_rings; q++) {
retval = rte_eth_rx_queue_setup(portid, q, nb_rxd,
rte_eth_dev_socket_id(portid), NULL, mbuf_pool);
if (retval < 0)
return retval;
}
/* Allocate and set up 1 TX queue per Ethernet port. */
for (q = 0; q < tx_rings; q++) {
retval = rte_eth_tx_queue_setup(portid, q, nb_txd,
rte_eth_dev_socket_id(portid), NULL);
if (retval < 0)
return retval;
}
/* Start the Ethernet port. */
retval = rte_eth_dev_start(portid);
if (retval < 0)
return retval;
return 0;
}
static void init_dpdk(int argc, char ** argv) {
unsigned int nb_mbufs;
int ret;
ret = rte_eal_init(argc, argv);
if (ret < 0)
rte_exit(EXIT_FAILURE, "initlize fail!");
argc -= ret;
argv += ret;
nb_mbufs = RTE_MAX((nb_rxd + nb_txd + BURST_SIZE + MEMPOOL_CACHE_SIZE), 8192U);
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", nb_mbufs,
MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool == NULL)
rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
if (port_init(0, mbuf_pool) != 0)
rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n", 0);
}
static void send_packets(int port) { static void send_packets(int port) {
struct rte_mbuf * pkt[BURST_SIZE]; struct rte_mbuf * pkt[BURST_SIZE];
struct rte_ether_hdr *eth_hdr; struct rte_ether_hdr *eth_hdr;
...@@ -393,8 +240,8 @@ static void send_packets(int port) { ...@@ -393,8 +240,8 @@ static void send_packets(int port) {
} }
#endif #endif
eth_hdr->ether_type = htons(0xaefe); eth_hdr->ether_type = htons(0xaefe);
memcpy(rte_pktmbuf_mtod_offset(pkt[i], uint8_t *, sizeof(struct rte_ether_hdr)), data[i], DATA_SIZE); memcpy(rte_pktmbuf_mtod_offset(pkt[i], uint8_t *, sizeof(struct rte_ether_hdr)), tx_data[i], TX_ECPRI_PACKET_SIZE);
pkt_size = PACKET_SIZE; pkt_size = TX_PACKET_SIZE;
pkt[i]->data_len = pkt_size; pkt[i]->data_len = pkt_size;
pkt[i]->pkt_len = pkt_size; pkt[i]->pkt_len = pkt_size;
} }
...@@ -407,31 +254,10 @@ static void send_packets(int port) { ...@@ -407,31 +254,10 @@ static void send_packets(int port) {
uint16_t buf; uint16_t buf;
for (buf = nb_tx; buf < BURST_SIZE; buf++) for (buf = nb_tx; buf < BURST_SIZE; buf++)
rte_pktmbuf_free(pkt[buf]); rte_pktmbuf_free(pkt[buf]);
fprintf(stderr, "Sent %d packets instead of %d\n", nb_tx, BURST_SIZE); log_exit("SEND_THREAD", "Sent %d packets instead of %d", nb_tx, BURST_SIZE);
exit(EXIT_FAILURE);
} }
} }
// TODO store received packets' data in buffer
static int recv_packets(int port) {
struct rte_mbuf * pkt[1024];
uint8_t * buf;
uint8_t * rtebuf;
while(1) {
const int nb_rx = rte_eth_rx_burst(port, 0, pkt, 1024);
for(int i = 0; i < nb_rx; i++) {
buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.write_index * rx_rbuf.len);
rtebuf = (uint8_t *) (pkt[i])->buf_addr + (pkt[i])->data_off;
memcpy(buf, rtebuf, (pkt[i])->pkt_len);
rbuf_update_write_index(&rx_rbuf);
rte_pktmbuf_free(pkt[i]);
}
if(nb_rx)
return nb_rx;
}
}
/* DPDK */ /* DPDK */
static void init_counter(volatile counter_stat_t * c) { static void init_counter(volatile counter_stat_t * c) {
...@@ -454,14 +280,61 @@ static void update_counter(volatile counter_stat_t * c, int64_t v) { ...@@ -454,14 +280,61 @@ static void update_counter(volatile counter_stat_t * c, int64_t v) {
c->pps_ts = ts; c->pps_ts = ts;
} }
} }
#ifdef SEND_LIMIT
static void send_limit_handler(struct timespec initial, TRXEcpriState * s) {
struct timespec next;
if(((tx_trace_buffer.counter / 262) > SEND_LIMIT) &&
((rx_trace_buffer.counter / 262) > SEND_LIMIT) &&
((trxw_trace_buffer.counter / 1024) > SEND_LIMIT) &&
((trxr_trace_buffer.counter / 256) > SEND_LIMIT)) {
int64_t d;
clock_gettime(CLOCK_TAI, &next);
d = calcdiff_ns(next, initial);
log_info("SEND_THREAD", "Packets sent: %" PRIi64, sent_counter.counter);
log_info("SEND_THREAD", "Duration: %" PRIi64, d);
log_info("SEND_THREAD", "ecpri_period_mult: %" PRIi64, ecpri_period_mult);
log_info("SEND_THREAD", "FRAME_FREQ: %" PRIi64, FRAME_FREQ);
#ifdef TRACE
FILE * f;
uint8_t n[256];
log_info("SEND_THREAD", "tx_trace_buffer counter: %li", tx_trace_buffer.counter);
log_info("SEND_THREAD", "rx_trace_buffer counter: %li", rx_trace_buffer.counter);
log_info("SEND_THREAD", "trxw_trace_buffer counter: %li", trxw_trace_buffer.counter);
log_info("SEND_THREAD", "trxr_trace_buffer counter: %li", trxr_trace_buffer.counter);
memset(n, '\0', 256);
sprintf(n, "%s/tx.trace", s->log_directory);
f = fopen(n, "wb+");
fwrite(tx_trace_buffer.buffer, tx_trace_buffer.counter, 1, f);
fclose(f);
memset(n, '\0', 256);
sprintf(n, "%s/rx.trace", s->log_directory);
f = fopen(n, "wb+");
fwrite(rx_trace_buffer.buffer, rx_trace_buffer.counter, 1, f);
fclose(f);
memset(n, '\0', 256);
sprintf(n, "%s/trxr.trace", s->log_directory);
f = fopen(n, "wb+");
fwrite(trxr_trace_buffer.buffer, trxr_trace_buffer.counter, 1, f);
fclose(f);
memset(n, '\0', 256);
sprintf(n, "%s/trxw.trace", s->log_directory);
f = fopen(n, "wb+");
fwrite(trxw_trace_buffer.buffer, trxw_trace_buffer.counter, 1, f);
fclose(f);
#endif
exit(EXIT_SUCCESS);
}
}
#endif
// Receives as fast as possible
// Signal to decode thread when packets receiving
static void *recv_thread(void *p) { static void *recv_thread(void *p) {
cpu_set_t mask; cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
log_info("RECV_THREAD", "Thread init"); log_info("RECV_THREAD", "Thread init");
// Set thread CPU affinity // Set thread CPU affinity
CPU_ZERO(&mask); CPU_ZERO(&mask);
CPU_SET(s->recv_affinity, &mask); CPU_SET(s->recv_affinity, &mask);
...@@ -469,152 +342,183 @@ static void *recv_thread(void *p) { ...@@ -469,152 +342,183 @@ static void *recv_thread(void *p) {
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->recv_affinity); error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->recv_affinity);
for(;;) { for(;;) {
struct rte_mbuf * pkt[1024];
uint8_t * buf, * rtebuf;
int port = 0;
int nb_rx;
update_counter(&recv_counter, recv_packets(0)); while(1) {
int n = rbuf_write_amount(&rx_rbuf);
nb_rx = rte_eth_rx_burst(port, 0, pkt, 1024);
if(nb_rx > n)
log_exit("RECV_THREAD", "%lip available to write in rx_rbuf, but %lip received", n, nb_rx);
pthread_mutex_lock(&rx_mutex); for(int i = 0; i < nb_rx; i++) {
pthread_cond_signal(&rx_cond); buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.write_index * rx_rbuf.len);
pthread_mutex_unlock(&rx_mutex); rtebuf = (uint8_t *) (pkt[i])->buf_addr + (pkt[i])->data_off;
if((pkt[i])->pkt_len > RX_MAX_PACKET_SIZE)
log_exit("RECV_THREAD", "Received packet of length %u, but RX_MAX_PACKET_SIZE = %u", (pkt[i])->pkt_len, RX_MAX_PACKET_SIZE);
memcpy(buf, rtebuf, (pkt[i])->pkt_len);
#ifdef TRACE
write_buffer(&rx_trace_buffer, 0, (uint8_t*) rtebuf, RX_MAX_PACKET_SIZE);
#endif
rbuf_update_write_index(&rx_rbuf);
rte_pktmbuf_free(pkt[i]);
}
if(nb_rx)
break;
}
update_counter(&recv_counter, nb_rx);
pthread_mutex_lock(&rx_mutex); pthread_cond_signal(&rx_cond); pthread_mutex_unlock(&rx_mutex);
} }
pthread_exit(EXIT_SUCCESS); pthread_exit(EXIT_SUCCESS);
} }
// Send as soon as packets are encoded
// Signal to encode thread that packets has been sent
static void *send_thread(void *p) { static void *send_thread(void *p) {
cpu_set_t mask; cpu_set_t mask;
struct timespec initial, next; struct timespec initial;
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
log_info("SEND_THREAD", "Thread init"); log_info("SEND_THREAD", "Thread init");
// Set thread CPU affinity // Set thread CPU affinity
CPU_ZERO(&mask); CPU_ZERO(&mask);
CPU_SET(s->send_affinity, &mask); CPU_SET(s->send_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask)) if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->send_affinity); error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->send_affinity);
pthread_mutex_lock(&tx_ready_mutex);
pthread_cond_wait(&tx_ready_cond, &tx_ready_mutex);
pthread_mutex_unlock(&tx_ready_mutex);
clock_gettime(CLOCK_TAI, &initial); clock_gettime(CLOCK_TAI, &initial);
for(int64_t i = 1;; i++) { for(int64_t i = 1;; i++) {
#ifdef SEND_LIMIT #ifdef SEND_LIMIT
if(i > SEND_LIMIT) { send_limit_handler(initial, s);
int64_t d;
clock_gettime(CLOCK_TAI, &next);
d = calcdiff_ns(next, initial);
log_debug("SEND_THREAD", "Packets sent: %" PRIi64, sent_counter.counter);
log_debug("SEND_THREAD", "Duration: %" PRIi64, d);
log_debug("SEND_THREAD", "ecpri_period_mult: %" PRIi64, ecpri_period_mult);
log_debug("SEND_THREAD", "FRAME_FREQ: %" PRIi64, FRAME_FREQ);
exit(EXIT_SUCCESS);
}
#endif #endif
// Prevent overflow int64_t n = rbuf_read_amount(&tx_rbuf);
if(i >= 3000000) { pthread_mutex_lock(&encode_mutex);
add_ns(&initial, (ecpri_period_mult * NSEC_PER_SEC * i) / FRAME_FREQ); if(n >= BURST_SIZE) {
i = 0; pthread_mutex_unlock(&encode_mutex);
}
next = initial;
// Multiply by i everytime to prevent any frequence drift
add_ns(&next, (ecpri_period_mult * NSEC_PER_SEC * i) / FRAME_FREQ);
for(int j = 0; j < (ecpri_period_mult / BURST_SIZE); j++) {
for(int k = 0; k < BURST_SIZE; k++) { for(int k = 0; k < BURST_SIZE; k++) {
memcpy(data[k], RBUF_READ(tx_rbuf, uint8_t), tx_rbuf.len); memcpy(tx_data[k], RBUF_READ0(tx_rbuf, uint8_t), tx_rbuf.len);
rbuf_update_read_index(&tx_rbuf); rbuf_update_read_index(&tx_rbuf);
} }
send_packets(0); send_packets(0);
update_counter(&sent_counter, BURST_SIZE); update_counter(&sent_counter, BURST_SIZE);
pthread_mutex_lock(&tx_mutex); pthread_cond_signal(&tx_cond); pthread_mutex_unlock(&tx_mutex);
}
else {
pthread_cond_wait(&encode_cond, &encode_mutex);
pthread_mutex_unlock(&encode_mutex);
} }
pthread_mutex_lock(&tx_mutex);
pthread_cond_signal(&tx_cond);
pthread_mutex_unlock(&tx_mutex);
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
} }
pthread_exit(EXIT_SUCCESS); pthread_exit(EXIT_SUCCESS);
} }
static void *prepare_thread(void *p) { /*
If sync has happenned (=we have received frames):
Prepare as soon as TRX has packet to write
Signal
Else:
Prepare as soon as there is space in tx buffer
*/
#define TX_SYNC_BURST_SIZE 512
static void *encode_thread(void *p) {
cpu_set_t mask; cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
int tx_ready_buffer_full = 0; int trx_started = 0;
int tx_started = 0; struct timespec next;
log_info("PREPARE_THREAD", "Thread init"); int64_t target_counter = 0;
int reset_encode_counter = 1;
int first_ts = 1;
log_info("PREPARE_THREAD", "Thread init");
// Set thread CPU affinity // Set thread CPU affinity
CPU_ZERO(&mask); CPU_ZERO(&mask);
CPU_SET(s->prepare_affinity, &mask); CPU_SET(s->encode_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask)) if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->prepare_affinity); error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->encode_affinity);
for(int64_t i = 0;; i++) { for(int64_t i = 0;; i++) {
int16_t samples_int[N_SAMPLES]; int16_t samples_int[TX_N_SAMPLES];
int n;
// If we have frames to prepare if(sync_complete && reset_encode_counter) {
int n = rbuf_write_amount(&tx_rbuf); encode_counter.counter = 0;
n = n < 500 ? n : 500; reset_encode_counter = 0;
if((i == 0) || n) { seq_id = 0;
// If there are frames from trx_write callback to prepare }
if(rbuf_read_amount(&trx_write_rbuf)) {
int64_t ts;
if(!tx_started)
prepared_counter.counter = 0;
tx_started = 1;
if(trx_wb_part_read_index == trx_wb_part_write_index) { // If we have frames to encode (is there space in TX buffer)
pthread_mutex_lock(&tx_mutex);
n = rbuf_write_amount(&tx_rbuf);
if(n) {
pthread_mutex_unlock(&tx_mutex);
// If there are frames from trx_write callback to encode
pthread_mutex_lock(&trx_write_mutex); pthread_mutex_lock(&trx_write_mutex);
pthread_cond_signal(&trx_write_cond); if(rbuf_read_amount(&trx_write_rbuf)) {
int64_t ts, frames_until_ts;
pthread_mutex_unlock(&trx_write_mutex); pthread_mutex_unlock(&trx_write_mutex);
continue;
}
trx_started = 1;
// Get the next timestamp at which we should write // Get the next timestamp at which we should write
ts = trx_wb_ts[trx_wb_part_read_index]; ts = trx_wb_ts[trx_wb_part_read_index];
// Number of empty frames to insert before next write frames_until_ts = ts - encode_counter.counter;
int64_t empty_frames_ahead = ts - prepared_counter.counter;
// We are sending only n frames in this iteration if(frames_until_ts > 0) {
empty_frames_ahead = empty_frames_ahead < n ? empty_frames_ahead : n; if(!first_ts) {
log_exit("ENCODE_THREAD", "Gap between TRX timestamps: %li", frames_until_ts);
if(empty_frames_ahead > 0) { // Send the empty frames }
for(int j = 0; j < empty_frames_ahead; j++) { else {
*((uint16_t *) (RBUF_WRITE(tx_rbuf, uint8_t) + 20)) = htons(seq_id++); int nb_frames = frames_until_ts > n ? n : frames_until_ts;
for(int j = 0; j < nb_frames; j++) {
memset(RBUF_WRITE0(tx_rbuf, uint8_t) + 8, 0x00, 240);
*((uint16_t *) (RBUF_WRITE0(tx_rbuf, uint8_t) + 6)) = htons(seq_id++);
#ifdef TRACE
write_buffer(&tx_trace_buffer, 14, (uint8_t*) RBUF_WRITE0(tx_rbuf, uint8_t), 248);
#endif
rbuf_update_write_index(&tx_rbuf); rbuf_update_write_index(&tx_rbuf);
update_counter(&prepared_counter, 1);
} }
update_counter(&encode_counter, nb_frames);
pthread_mutex_lock(&encode_mutex); pthread_cond_signal(&encode_cond); pthread_mutex_unlock(&encode_mutex);
if(nb_frames == frames_until_ts)
first_ts = 0;
} }
else if (empty_frames_ahead == 0) { }
if (frames_until_ts == 0) {
int next_trx_index = trx_wb_part[(trx_wb_part_read_index + 1) % TRX_WB_MAX_PARTS]; int next_trx_index = trx_wb_part[(trx_wb_part_read_index + 1) % TRX_WB_MAX_PARTS];
// TRX frames to read and encode // TRX frames to encode
int nb_frames = next_trx_index - trx_write_rbuf.read_index; int nb_frames = (trx_write_rbuf.buf_len + next_trx_index - trx_write_rbuf.read_index) % trx_write_rbuf.buf_len;
int left_frames = nb_frames; int left_frames = nb_frames;
nb_frames = nb_frames < n ? nb_frames : n; nb_frames = nb_frames < n ? nb_frames : n;
left_frames -= nb_frames; left_frames -= nb_frames;
for(int j = 0; j < nb_frames; j++) { for(int j = 0; j < nb_frames; j++) {
float * const trx_samples = RBUF_READ(trx_write_rbuf, float); float * const trx_samples = RBUF_READ0(trx_write_rbuf, float);
uint8_t * const tx_frame = RBUF_WRITE(tx_rbuf, uint8_t); uint8_t * const tx_frame = RBUF_WRITE0(tx_rbuf, uint8_t);
memset(samples_int, 0, 512); memset(samples_int, 0, 512);
float_to_int16(samples_int, trx_samples, N_SAMPLES, 32767); float_to_int16(samples_int, trx_samples, TX_N_SAMPLES, 32767);
encode_bf1(tx_frame + 22 , samples_int); encode_bf1(tx_frame + 8 , samples_int);
encode_bf1(tx_frame + 22 + 60 , samples_int + 64); encode_bf1(tx_frame + 8 + 60 , samples_int + 64);
encode_bf1(tx_frame + 22 + 120, samples_int + 128); encode_bf1(tx_frame + 8 + 120, samples_int + 128);
encode_bf1(tx_frame + 22 + 180, samples_int + 192); encode_bf1(tx_frame + 8 + 180, samples_int + 192);
*((uint16_t *)(tx_frame + 20)) = htons(seq_id++); *((uint16_t *)(tx_frame + 6)) = htons(seq_id++);
#ifdef TRACE
write_buffer(&tx_trace_buffer, 14, (uint8_t*) RBUF_WRITE0(tx_rbuf, uint8_t), 248);
#endif
rbuf_update_write_index(&tx_rbuf); rbuf_update_write_index(&tx_rbuf);
rbuf_update_read_index(&trx_write_rbuf); rbuf_update_read_index(&trx_write_rbuf);
update_counter(&prepared_counter, 1); update_counter(&encode_counter, 1);
} }
if(left_frames == 0) { if(left_frames == 0) {
trx_wb_part_read_index = (trx_wb_part_read_index + 1) % TRX_WB_MAX_PARTS; trx_wb_part_read_index = (trx_wb_part_read_index + 1) % TRX_WB_MAX_PARTS;
...@@ -622,64 +526,85 @@ static void *prepare_thread(void *p) { ...@@ -622,64 +526,85 @@ static void *prepare_thread(void *p) {
else { else {
trx_wb_ts[trx_wb_part_read_index] += nb_frames; trx_wb_ts[trx_wb_part_read_index] += nb_frames;
} }
pthread_mutex_lock(&encode_mutex); pthread_cond_signal(&encode_cond); pthread_mutex_unlock(&encode_mutex);
} }
// We have sent too much empty frames and missed a timestamp // We have sent too much empty frames and missed a timestamp
else { else if(frames_until_ts < 0) {
log_error("TRX_ECPRI_SEND", "Missed trx_write timestamp: p %015li ts %015li r %015li n %015li e %015li", prepared_counter.counter, ts, rbuf_read_amount(&trx_write_rbuf), n, empty_frames_ahead); log_error("TRX_ECPRI_SEND", "Missed trx_write timestamp: p %015li ts %015li r %015li n %015li e %015li", encode_counter.counter, ts, rbuf_read_amount(&trx_write_rbuf), n, frames_until_ts);
}
} }
else {
// Send empty frames until we receive something
if(!trx_started && !sync_complete) {
pthread_mutex_unlock(&trx_write_mutex);
if(i == 0)
clock_gettime(CLOCK_TAI, &next);
// Limit packets sent
if(encode_counter.counter > target_counter) {
add_ns(&next, 10000000); // 10ms to send 38400 packets
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
target_counter += 38400;
} }
else if(!tx_started) { n = (n > TX_SYNC_BURST_SIZE) ? n : TX_SYNC_BURST_SIZE;
*((uint16_t *) (RBUF_WRITE(tx_rbuf, uint8_t) + 6)) = htons(seq_id++); for(int j = 0; j < n; j++) {
*((uint16_t *) (RBUF_WRITE0(tx_rbuf, uint8_t) + 6)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf); rbuf_update_write_index(&tx_rbuf);
update_counter(&prepared_counter, 1);
} }
update_counter(&encode_counter, n);
pthread_mutex_lock(&encode_mutex); pthread_cond_signal(&encode_cond); pthread_mutex_unlock(&encode_mutex);
} }
// Wait for TRX
else { else {
if (!tx_ready_buffer_full) { pthread_cond_wait(&trx_write_cond, &trx_write_mutex);
tx_ready_buffer_full = 1; pthread_mutex_unlock(&trx_write_mutex);
pthread_mutex_lock(&tx_ready_mutex);
pthread_cond_signal(&tx_ready_cond);
pthread_mutex_unlock(&tx_ready_mutex);
} }
pthread_mutex_lock(&tx_mutex); }
}
else {
pthread_cond_wait(&tx_cond, &tx_mutex); pthread_cond_wait(&tx_cond, &tx_mutex);
pthread_mutex_unlock(&tx_mutex); pthread_mutex_unlock(&tx_mutex);
} }
} }
pthread_exit(EXIT_SUCCESS); pthread_exit(EXIT_SUCCESS);
} }
static void *decompress_thread(void *p) { static void *decode_thread(void *p) {
struct timespec a,b;
cpu_set_t mask; cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
const float mult = 1. / 32767.; const float mult = 1. / 32767.;
FILE * trace_file_desc; FILE * trace_file_desc;
log_info("DECOMPRESS_THREAD", "Thread init"); log_info("DECOMPRESS_THREAD", "Thread init");
if(s->trace_period) {
if(s->trace_period) uint8_t trace_file_name[256];
trace_file_desc = fopen(s->trace_file, "w+"); memset(trace_file_name, '\0', 256);
sprintf(trace_file_name, "%s/partial-rx.trace", s->log_directory);
trace_file_desc = fopen(trace_file_name, "w+");
}
// Set thread CPU affinity // Set thread CPU affinity
CPU_ZERO(&mask); CPU_ZERO(&mask);
CPU_SET(s->decompress_affinity, &mask); CPU_SET(s->decode_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask)) if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->decompress_affinity); error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->decode_affinity);
for(int64_t k = 0;;) { for(int64_t k = 0;;) {
pthread_mutex_lock(&rx_mutex);
int n = rbuf_read_amount(&rx_rbuf); int n = rbuf_read_amount(&rx_rbuf);
if(n) { if(!n)
pthread_cond_wait(&rx_cond, &rx_mutex);
pthread_mutex_unlock(&rx_mutex);
for(int j = 0; j < n; j++) { for(int j = 0; j < n; j++) {
int16_t samples_int[N_SAMPLES]; int16_t samples_int[RX_N_SAMPLES];
const uint8_t * dst_mac = RBUF_READ(rx_rbuf, uint8_t); const uint8_t * dst_mac = RBUF_READ0(rx_rbuf, uint8_t);
const uint8_t * src_mac = RBUF_READ(rx_rbuf, uint8_t) + 6; const uint8_t * src_mac = RBUF_READ0(rx_rbuf, uint8_t) + 6;
const uint16_t ether_type = *((uint16_t*) (RBUF_READ(rx_rbuf, uint8_t) + 12)); const uint16_t ether_type = htons(*((uint16_t*) (RBUF_READ0(rx_rbuf, uint8_t) + 12)));
const uint8_t ecpri_protocol_rev = *(RBUF_READ(rx_rbuf, uint8_t) + 14); const uint8_t ecpri_protocol_rev = *(RBUF_READ0(rx_rbuf, uint8_t) + 14);
const uint8_t ecpri_message_type = *(RBUF_READ(rx_rbuf, uint8_t) + 15); const uint8_t ecpri_message_type = *(RBUF_READ0(rx_rbuf, uint8_t) + 15);
const uint16_t ecpri_payload_size = *((uint16_t*) (RBUF_READ(rx_rbuf, uint8_t) + 16)); const uint16_t ecpri_payload_size = htons(*((uint16_t*) (RBUF_READ0(rx_rbuf, uint8_t) + 16)));
const uint16_t pc_id = *((uint16_t*) (RBUF_READ(rx_rbuf, uint8_t) + 18)); const uint16_t pc_id = htons(*((uint16_t*) (RBUF_READ0(rx_rbuf, uint8_t) + 18)));
const uint16_t seq_id = *((uint16_t*) (RBUF_READ(rx_rbuf, uint8_t) + 20)); const uint16_t seq_id = htons(*((uint16_t*) (RBUF_READ0(rx_rbuf, uint8_t) + 20)));
const uint8_t * rx_samples = RBUF_READ(rx_rbuf, uint8_t) + 22; const uint8_t * rx_samples = RBUF_READ0(rx_rbuf, uint8_t) + 22;
if(s->trace_period && !(k % s->trace_period)) { if(s->trace_period && !(k % s->trace_period)) {
fprintf(trace_file_desc, fprintf(trace_file_desc,
...@@ -697,26 +622,23 @@ static void *decompress_thread(void *p) { ...@@ -697,26 +622,23 @@ static void *decompress_thread(void *p) {
} }
k++; k++;
rbuf_update_read_index(&rx_rbuf); rbuf_update_read_index(&rx_rbuf);
#if 1 if((ecpri_payload_size - 4) % 60) {
if(ecpri_payload_size == 0xf400) { fprintf(stderr, "received eCPRI payload of size %u, not a multiple of 60\n", (ecpri_payload_size));
memset((uint8_t * ) samples_int, 0, 512); exit(EXIT_FAILURE);
}
int n_rbuf = rbuf_write_amount(&trx_read_rbuf);
if(n_rbuf < ((ecpri_payload_size - 4) / 60))
log_exit("DECODE_THREAD", "Not enough space to write in trx_read_rbuf (%li < %li)", n_rbuf, ((ecpri_payload_size - 4) / 60));
for(int i = 0; i < (ecpri_payload_size - 4) / 60; i++) {
memset((uint8_t * ) samples_int, 0, sizeof(int16_t) * RX_N_SAMPLES);
decode_bf1(samples_int , rx_samples , 16); decode_bf1(samples_int, rx_samples + i * 60, 16);
decode_bf1(samples_int + 64 , rx_samples + 60, 16);
decode_bf1(samples_int + 128, rx_samples + 120, 16);
decode_bf1(samples_int + 192, rx_samples + 180, 16);
int16_to_float(RBUF_WRITE(trx_read_rbuf, float), samples_int, N_SAMPLES, mult); int16_to_float(RBUF_WRITE0(trx_read_rbuf, float), samples_int, RX_N_SAMPLES, mult);
rbuf_update_write_index(&trx_read_rbuf); rbuf_update_write_index(&trx_read_rbuf);
sem_post(&trx_read_sem);
} }
#endif update_counter(&decode_counter, 1);
} pthread_mutex_lock(&decode_mutex); pthread_cond_signal(&decode_cond); pthread_mutex_unlock(&decode_mutex);
}
else {
pthread_mutex_lock(&rx_mutex);
pthread_cond_wait(&rx_cond, &rx_mutex);
pthread_mutex_unlock(&rx_mutex);
} }
} }
pthread_exit(EXIT_SUCCESS); pthread_exit(EXIT_SUCCESS);
...@@ -730,9 +652,13 @@ static void *statistic_thread(void *p) { ...@@ -730,9 +652,13 @@ static void *statistic_thread(void *p) {
log_info("STATISTIC_THREAD", "Thread init"); log_info("STATISTIC_THREAD", "Thread init");
stats_file_desc = fopen(s->stats_file, "w+"); uint8_t stats_file_name[256];
memset(stats_file_name, '\0', 256);
sprintf(stats_file_name, "%s/ecpri.stats", s->log_directory);
stats_file_desc = fopen(stats_file_name, "w+");
if(!stats_file_desc) if(!stats_file_desc)
error(EXIT_FAILURE, errno, "Couldn't open %s\n", s->stats_file); error(EXIT_FAILURE, errno, "Couldn't open %s\n", stats_file_name);
// Set thread CPU affinity // Set thread CPU affinity
CPU_ZERO(&mask); CPU_ZERO(&mask);
...@@ -742,24 +668,37 @@ static void *statistic_thread(void *p) { ...@@ -742,24 +668,37 @@ static void *statistic_thread(void *p) {
clock_gettime(CLOCK_TAI, &initial); clock_gettime(CLOCK_TAI, &initial);
next = initial; next = initial;
for(int64_t i = 0;; i++) {
add_ns(&next, STATISTIC_REFRESH_RATE);
if((i % 50) == 0)
fprintf(stats_file_desc, fprintf(stats_file_desc,
"%14s - %14s - %14s - %14s - %14s %14s \n", "%14s %14s %14s %14s %14s %14s %14s %14s %14s %14s %14s %14s \n",
"prepared", "received",
"decode",
"read", "read",
"write",
"encode",
"sent", "sent",
"received", "received pps",
"pps", "decode pps",
"ppsr"); "read pps",
for(;;) { "write pps",
add_ns(&next, STATISTIC_REFRESH_RATE); "encode pps",
"sent pps");
fprintf(stats_file_desc, fprintf(stats_file_desc,
"%14" PRIi64 " - %14" PRIi64 " - %14" PRIi64 " - %14" PRIi64 " - %14" PRIi64 "pps %14" PRIi64 "pps\n", "%14" PRIi64 " %14" PRIi64 " %14" PRIi64 " %14" PRIi64 " %14" PRIi64 " %14" PRIi64 " %14" PRIi64 "pps %14" PRIi64 "pps %14" PRIi64 "pps %14" PRIi64 "pps %14" PRIi64 "pps %14" PRIi64 "pps\n",
prepared_counter.counter, recv_counter.counter,
decode_counter.counter,
read_counter.counter, read_counter.counter,
write_counter.counter,
encode_counter.counter,
sent_counter.counter, sent_counter.counter,
recv_counter.counter, recv_counter.pps,
sent_counter.pps, decode_counter.pps,
recv_counter.pps); read_counter.pps,
write_counter.pps,
encode_counter.pps,
sent_counter.pps);
fflush(stats_file_desc); fflush(stats_file_desc);
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL); clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
} }
...@@ -769,18 +708,18 @@ static void *statistic_thread(void *p) { ...@@ -769,18 +708,18 @@ static void *statistic_thread(void *p) {
static int start_threads(TRXEcpriState * s) { static int start_threads(TRXEcpriState * s) {
pthread_t recv_pthread; pthread_t recv_pthread;
pthread_t send_pthread; pthread_t send_pthread;
pthread_t prepare_pthread; pthread_t encode_pthread;
pthread_t decompress_pthread; pthread_t decode_pthread;
pthread_t statistic_pthread; pthread_t statistic_pthread;
struct sched_param recv_param; struct sched_param recv_param;
struct sched_param send_param; struct sched_param send_param;
struct sched_param prepare_param; struct sched_param encode_param;
struct sched_param decompress_param; struct sched_param decode_param;
struct sched_param statistic_param; struct sched_param statistic_param;
pthread_attr_t recv_attr; pthread_attr_t recv_attr;
pthread_attr_t send_attr; pthread_attr_t send_attr;
pthread_attr_t prepare_attr; pthread_attr_t encode_attr;
pthread_attr_t decompress_attr; pthread_attr_t decode_attr;
pthread_attr_t statistic_attr; pthread_attr_t statistic_attr;
log_info("TRX_ECPRI", "Starting threads"); log_info("TRX_ECPRI", "Starting threads");
...@@ -813,28 +752,28 @@ static int start_threads(TRXEcpriState * s) { ...@@ -813,28 +752,28 @@ static int start_threads(TRXEcpriState * s) {
if (pthread_attr_setinheritsched(&send_attr, PTHREAD_EXPLICIT_SCHED)) if (pthread_attr_setinheritsched(&send_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n"); log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&prepare_attr)) if (pthread_attr_init(&encode_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n"); log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&prepare_attr, PTHREAD_STACK_MIN)) if (pthread_attr_setstacksize(&encode_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n"); log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&prepare_attr, SCHED_FIFO)) if (pthread_attr_setschedpolicy(&encode_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n"); log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
prepare_param.sched_priority = 97; encode_param.sched_priority = 97;
if (pthread_attr_setschedparam(&prepare_attr, &prepare_param)) if (pthread_attr_setschedparam(&encode_attr, &encode_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n"); log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&prepare_attr, PTHREAD_EXPLICIT_SCHED)) if (pthread_attr_setinheritsched(&encode_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n"); log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&decompress_attr)) if (pthread_attr_init(&decode_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n"); log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&decompress_attr, PTHREAD_STACK_MIN)) if (pthread_attr_setstacksize(&decode_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n"); log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&decompress_attr, SCHED_FIFO)) if (pthread_attr_setschedpolicy(&decode_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n"); log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
decompress_param.sched_priority = 97; decode_param.sched_priority = 97;
if (pthread_attr_setschedparam(&decompress_attr, &decompress_param)) if (pthread_attr_setschedparam(&decode_attr, &decode_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n"); log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&decompress_attr, PTHREAD_EXPLICIT_SCHED)) if (pthread_attr_setinheritsched(&decode_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n"); log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&statistic_attr)) if (pthread_attr_init(&statistic_attr))
...@@ -849,12 +788,13 @@ static int start_threads(TRXEcpriState * s) { ...@@ -849,12 +788,13 @@ static int start_threads(TRXEcpriState * s) {
if (pthread_attr_setinheritsched(&statistic_attr, PTHREAD_EXPLICIT_SCHED)) if (pthread_attr_setinheritsched(&statistic_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n"); log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_create(&encode_pthread, NULL, encode_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create encode thread");
if (pthread_create(&decode_pthread, NULL, decode_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create decode thread");
usleep(1000 * 100);
if (pthread_create(&send_pthread, NULL, send_thread, s)) if (pthread_create(&send_pthread, NULL, send_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create send thread"); error(EXIT_FAILURE, errno, "Couldn't create send thread");
if (pthread_create(&prepare_pthread, NULL, prepare_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create prepare thread");
if (pthread_create(&decompress_pthread, NULL, decompress_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create decompress thread");
if (pthread_create(&recv_pthread, NULL, recv_thread, s)) if (pthread_create(&recv_pthread, NULL, recv_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create recv thread"); error(EXIT_FAILURE, errno, "Couldn't create recv thread");
if (pthread_create(&statistic_pthread, NULL, statistic_thread, s)) if (pthread_create(&statistic_pthread, NULL, statistic_thread, s))
...@@ -864,12 +804,18 @@ static int start_threads(TRXEcpriState * s) { ...@@ -864,12 +804,18 @@ static int start_threads(TRXEcpriState * s) {
} }
int startdpdk(TRXEcpriState * s) { int startdpdk(TRXEcpriState * s) {
uint8_t ecpri_message[DATA_SIZE]; uint8_t ecpri_message[TX_ECPRI_PACKET_SIZE];
int argc = 1; int argc = 1;
int k = 1; int k = 1;
int prev_space = -1; int prev_space = -1;
char ** argv; uint8_t ** argv;
for(int i = 0; i < 1024; i++)
iq_frame_full[i] = 0xff;
for(int i = 0; i < 1024; i++)
iq_frame_empty[i] = 0x00;
for(int i = 0;; i++) { for(int i = 0;; i++) {
if(s->dpdk_options[i] == ' ') if(s->dpdk_options[i] == ' ')
...@@ -877,11 +823,11 @@ int startdpdk(TRXEcpriState * s) { ...@@ -877,11 +823,11 @@ int startdpdk(TRXEcpriState * s) {
else if(s->dpdk_options[i] == '\0') else if(s->dpdk_options[i] == '\0')
break; break;
} }
argv = (char **) malloc(sizeof(char *) * argc); argv = (uint8_t **) malloc(sizeof(uint8_t *) * argc);
for(int i = 0;; i++) { for(int i = 0;; i++) {
if(s->dpdk_options[i] == ' ') { if(s->dpdk_options[i] == ' ') {
argv[k] = (char *) malloc(i - prev_space); argv[k] = (uint8_t *) malloc(i - prev_space);
strncpy(argv[k], s->dpdk_options + prev_space + 1, i - prev_space -1); strncpy(argv[k], s->dpdk_options + prev_space + 1, i - prev_space -1);
argv[k][i - prev_space-1] = '\0'; argv[k][i - prev_space-1] = '\0';
prev_space = i; prev_space = i;
...@@ -899,31 +845,35 @@ int startdpdk(TRXEcpriState * s) { ...@@ -899,31 +845,35 @@ int startdpdk(TRXEcpriState * s) {
//set_latency_target(); //set_latency_target();
seq_id = 0; seq_id = 0;
init_counter(&recv_counter);
init_counter(&decode_counter);
init_counter(&read_counter); init_counter(&read_counter);
init_counter(&write_counter);
init_counter(&encode_counter);
init_counter(&sent_counter); init_counter(&sent_counter);
init_counter(&prepared_counter);
init_counter(&recv_counter);
ecpri_period_mult = (s->ecpri_period * FRAME_FREQ) / 1000000; ecpri_period_mult = (s->ecpri_period * FRAME_FREQ) / 1000000;
rxtx_buf_size = (3 * ecpri_period_mult); rxtx_buf_size = (3 * ecpri_period_mult);
RBUF_INIT(rx_rbuf, "RX ring buffer", rxtx_buf_size, PACKET_SIZE, uint8_t); RBUF_INIT(rx_rbuf, "RX ring buffer", rxtx_buf_size, RX_MAX_PACKET_SIZE, uint8_t);
RBUF_INIT(tx_rbuf, "TX ring buffer", rxtx_buf_size, DATA_SIZE, uint8_t); RBUF_INIT(tx_rbuf, "TX ring buffer", rxtx_buf_size, TX_ECPRI_PACKET_SIZE, uint8_t);
RBUF_INIT(trx_read_rbuf, "TRXRead ring buffer", TRX_BUF_MAX_SIZE, N_SAMPLES, float); RBUF_INIT(trx_read_rbuf, "TRXRead ring buffer", TRX_BUF_MAX_SIZE, RX_N_SAMPLES, float);
RBUF_INIT(trx_write_rbuf, "TRXWrite ring buffer", TRX_BUF_MAX_SIZE, N_SAMPLES, float); RBUF_INIT(trx_write_rbuf, "TRXWrite ring buffer", TRX_BUF_MAX_SIZE, TX_N_SAMPLES, float);
trx_wb_part_read_index = 0; trx_wb_part_read_index = 0;
trx_wb_part_write_index = 0; trx_wb_part_write_index = 0;
pthread_mutex_init(&tx_mutex, NULL); pthread_mutex_init(&tx_mutex, NULL);
pthread_mutex_init(&encode_mutex, NULL);
pthread_mutex_init(&rx_mutex, NULL); pthread_mutex_init(&rx_mutex, NULL);
pthread_mutex_init(&tx_ready_mutex, NULL); pthread_mutex_init(&decode_mutex, NULL);
pthread_cond_init(&tx_cond, NULL); pthread_cond_init(&tx_cond, NULL);
pthread_cond_init(&encode_cond, NULL);
pthread_cond_init(&rx_cond, NULL); pthread_cond_init(&rx_cond, NULL);
pthread_cond_init(&tx_ready_cond, NULL); pthread_cond_init(&decode_cond, NULL);
sem_init(&trx_read_sem, 0, 0); sem_init(&trx_read_sem, 0, 0);
memset((uint8_t *) ecpri_message, 0, DATA_SIZE); memset((uint8_t *) ecpri_message, 0, TX_ECPRI_PACKET_SIZE);
if(sscanf(s->re_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c", if(sscanf(s->re_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c",
&d_addr.addr_bytes[0], &d_addr.addr_bytes[0],
...@@ -964,52 +914,84 @@ static void trx_ecpri_end(TRXState *s1) ...@@ -964,52 +914,84 @@ static void trx_ecpri_end(TRXState *s1)
free(s); free(s);
} }
int64_t prev_ts = 0;
int64_t prev_count = 0;
#define M 32
static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void **__samples, int count, int tx_port_index, TRXWriteMetadata *md) static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void **__samples, int count, int tx_port_index, TRXWriteMetadata *md)
{ {
(void) s1; (void) s1;
float ** _samples = (float **) __samples; float ** _samples = (float **) __samples;
int write_count = count / 256; int write_count = count / M;
int64_t ts = timestamp / 256; int64_t ts = timestamp / M;
log_limit("TRX_ECPRI_WRITE", "timestamp = %li, next = %li, write_count = %li, samples = %s", ts, ts + write_count, write_count, __samples ? "yes" : "no");
if(!__samples)
return;
pthread_mutex_lock(&trx_write_mutex); int n_rbuf = rbuf_write_amount(&trx_write_rbuf);
pthread_cond_signal(&trx_write_cond); if(write_count > n_rbuf)
pthread_mutex_unlock(&trx_write_mutex); log_exit("TRX_ECPRI_WRITE", "Not enough space in trx_write_rbuf (%li > %li)", write_count, n_rbuf);
if(prev_count && (ts - prev_ts) != prev_count)
log_exit("TRX_ECPRI_WRITE", "Gap between timestamps");
prev_ts = ts;
prev_count = write_count;
trx_wb_part[trx_wb_part_write_index] = trx_write_rbuf.write_index; trx_wb_part[trx_wb_part_write_index] = trx_write_rbuf.write_index;
trx_wb_ts[trx_wb_part_write_index] = ts; trx_wb_ts[trx_wb_part_write_index] = ts;
for(int k = 0; k < write_count; k++) { for(int k = 0; k < write_count; k++) {
for(int i = 0; i < 4; i++) if(__samples) {
for(int i = 0; i < 4; i++) {
for(int j = 0; j < 64; j++) { for(int j = 0; j < 64; j++) {
RBUF_WRITE(trx_write_rbuf, float)[i * 64 + j] = _samples[i][j + (k * 64)]; RBUF_WRITE(trx_write_rbuf, k, float)[i * 64 + j] = _samples[i][j + (k * 64)];
} }
rbuf_update_write_index(&trx_write_rbuf); }
}
else {
memcpy((uint8_t *) RBUF_WRITE(trx_write_rbuf, k, float), iq_frame_empty, sizeof(float) * 4 * 64);
}
#ifdef TRACE
if(__samples)
write_buffer(&trxw_trace_buffer, 0, iq_frame_full, 1024);
else
write_buffer(&trxw_trace_buffer, 0, iq_frame_empty, 1024);
#endif
} }
trx_wb_part_write_index = (trx_wb_part_write_index + 1) % TRX_WB_MAX_PARTS; trx_wb_part_write_index = (trx_wb_part_write_index + 1) % TRX_WB_MAX_PARTS;
trx_wb_part[trx_wb_part_write_index] = trx_write_rbuf.write_index; trx_wb_part[trx_wb_part_write_index] = trx_write_rbuf.write_index + write_count;
// Update write index at the end so that everything stays consistent
for(int k = 0; k < write_count; k++)
rbuf_update_write_index(&trx_write_rbuf);
update_counter(&write_counter, write_count);
pthread_mutex_lock(&trx_write_mutex);
pthread_cond_signal(&trx_write_cond);
pthread_mutex_unlock(&trx_write_mutex);
} }
static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__samples, int count, int rx_port_index, TRXReadMetadata *md) static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__samples, int count, int rx_port_index, TRXReadMetadata *md)
{ {
(void) s1; (void) s1;
float ** _samples = (float **) __samples; float ** _samples = (float **) __samples;
int read_count = (count / 256); int read_count = (count / M);
log_limit("TRX_ECPRI_READ", "count = %ld", count); log_limit("TRX_ECPRI_READ", "count = %ld", count);
pthread_mutex_lock(&decode_mutex);
while(rbuf_read_amount(&trx_read_rbuf) < read_count) {
pthread_cond_wait(&decode_cond, &decode_mutex);
}
pthread_mutex_unlock(&decode_mutex);
sync_complete = 1;
for(int k = 0; k < read_count; k++) { for(int k = 0; k < read_count; k++) {
float * trx_samples; float * trx_samples;
sem_wait(&trx_read_sem); trx_samples = RBUF_READ0(trx_read_rbuf, float);
trx_samples = RBUF_READ(trx_read_rbuf, float); for(int i = 0; i < 64; i++)
for(int i = 0; i < 256; i++)
_samples[0][i] = trx_samples[i]; _samples[0][i] = trx_samples[i];
#ifdef TRACE
write_buffer(&trxr_trace_buffer, 0, (uint8_t *) trx_samples, 64 * sizeof(float));
#endif
rbuf_update_read_index(&trx_read_rbuf); rbuf_update_read_index(&trx_read_rbuf);
} }
*ptimestamp = read_counter.counter * 256; *ptimestamp = read_counter.counter * M;
update_counter(&read_counter, read_count); update_counter(&read_counter, read_count);
return count; return count;
...@@ -1042,6 +1024,14 @@ static int trx_ecpri_start(TRXState *s1, const TRXDriverParams *params) ...@@ -1042,6 +1024,14 @@ static int trx_ecpri_start(TRXState *s1, const TRXDriverParams *params)
} }
void dummy_enb_init(TRXState *s1, TRXEcpriState *s) { void dummy_enb_init(TRXState *s1, TRXEcpriState *s) {
#ifdef TRACE
init_buffer(&rx_trace_buffer, TRACE_BUFFER_SIZE_MB * 1000000);
init_buffer(&tx_trace_buffer, TRACE_BUFFER_SIZE_MB * 1000000);
init_buffer(&trxw_trace_buffer, TRACE_BUFFER_SIZE_MB * 1000000);
init_buffer(&trxr_trace_buffer, TRACE_BUFFER_SIZE_MB * 1000000);
#endif
s1->trx_write_func2 = trx_ecpri_write; s1->trx_write_func2 = trx_ecpri_write;
s1->trx_read_func2 = trx_ecpri_read; s1->trx_read_func2 = trx_ecpri_read;
startdpdk(s); startdpdk(s);
...@@ -1068,14 +1058,21 @@ int trx_driver_init(TRXState *s1) ...@@ -1068,14 +1058,21 @@ int trx_driver_init(TRXState *s1)
s = malloc(sizeof(TRXEcpriState)); s = malloc(sizeof(TRXEcpriState));
memset(s, 0, sizeof(*s)); memset(s, 0, sizeof(*s));
#ifdef TRACE
init_buffer(&rx_trace_buffer, TRACE_BUFFER_SIZE_MB * 1000000);
init_buffer(&tx_trace_buffer, TRACE_BUFFER_SIZE_MB * 1000000);
init_buffer(&trxw_trace_buffer, TRACE_BUFFER_SIZE_MB * 1000000);
init_buffer(&trxr_trace_buffer, TRACE_BUFFER_SIZE_MB * 1000000);
#endif
trx_get_param_double(s1, &val, "recv_affinity"); trx_get_param_double(s1, &val, "recv_affinity");
s->recv_affinity = (int) val; s->recv_affinity = (int) val;
trx_get_param_double(s1, &val, "send_affinity"); trx_get_param_double(s1, &val, "send_affinity");
s->send_affinity = (int) val; s->send_affinity = (int) val;
trx_get_param_double(s1, &val, "prepare_affinity"); trx_get_param_double(s1, &val, "encode_affinity");
s->prepare_affinity = (int) val; s->encode_affinity = (int) val;
trx_get_param_double(s1, &val, "decompress_affinity"); trx_get_param_double(s1, &val, "decode_affinity");
s->decompress_affinity = (int) val; s->decode_affinity = (int) val;
trx_get_param_double(s1, &val, "statistic_affinity"); trx_get_param_double(s1, &val, "statistic_affinity");
s->statistic_affinity = (int) val; s->statistic_affinity = (int) val;
trx_get_param_double(s1, &val, "flow_id"); trx_get_param_double(s1, &val, "flow_id");
...@@ -1093,8 +1090,7 @@ int trx_driver_init(TRXState *s1) ...@@ -1093,8 +1090,7 @@ int trx_driver_init(TRXState *s1)
s->rec_mac = trx_get_param_string(s1, "rec_mac"); s->rec_mac = trx_get_param_string(s1, "rec_mac");
s->rec_if = trx_get_param_string(s1, "rec_if"); s->rec_if = trx_get_param_string(s1, "rec_if");
s->dpdk_options = trx_get_param_string(s1, "dpdk_options"); s->dpdk_options = trx_get_param_string(s1, "dpdk_options");
s->trace_file = trx_get_param_string(s1, "trace_file"); s->log_directory = trx_get_param_string(s1, "log_directory");
s->stats_file = trx_get_param_string(s1, "stats_file");
s1->opaque = s; s1->opaque = s;
s1->trx_end_func = trx_ecpri_end; s1->trx_end_func = trx_ecpri_end;
......
static void log_error(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " ERROR [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
puts(line);
exit(EXIT_FAILURE);
}
static volatile int64_t limit_counter = 0;
static inline void log_limit(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
if(limit_counter++ % 1000000)
return;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " DEBUG [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
puts(line);
}
static void log_info(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " INFO [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
puts(line);
}
static void log_exit(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " EXIT [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
fprintf(stderr, "%s\n", line);
fflush(stdout);
fflush(stderr);
exit(EXIT_FAILURE);
}
#ifdef DEBUG
static void log_debug(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " DEBUG [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
puts(line);
}
#else
#define log_debug(...)
#endif
static int latency_target_fd = -1;
static int32_t latency_target_value = 0;
/* Latency trick
* if the file /dev/cpu_dma_latency exists,
* open it and write a zero into it. This will tell
* the power management system not to transition to
* a high cstate (in fact, the system acts like idle=poll)
* When the fd to /dev/cpu_dma_latency is closed, the behavior
* goes back to the system default.
*
* Documentation/power/pm_qos_interface.txt
*/
void set_latency_target(void) {
struct stat s;
int err;
errno = 0;
err = stat("/dev/cpu_dma_latency", &s);
if (err == -1) {
error(EXIT_FAILURE, errno, "WARN: stat /dev/cpu_dma_latency failed");
return;
}
errno = 0;
latency_target_fd = open("/dev/cpu_dma_latency", O_RDWR);
if (latency_target_fd == -1) {
error(EXIT_FAILURE, errno, "WARN: open /dev/cpu_dma_latency");
return;
}
errno = 0;
err = write(latency_target_fd, &latency_target_value, 4);
if (err < 1) {
error(EXIT_FAILURE, errno, "# error setting cpu_dma_latency to %d!",
latency_target_value);
close(latency_target_fd);
return;
}
printf("# /dev/cpu_dma_latency set to %dus\n", latency_target_value);
}
// Timestamps utils
#define NSEC_PER_SEC INT64_C(1000000000)
static struct timespec int_to_ts(int64_t t) {
struct timespec ts;
ts.tv_sec = t / NSEC_PER_SEC;
ts.tv_nsec = t - (ts.tv_sec * NSEC_PER_SEC);
return ts;
}
static int64_t ts_to_int(struct timespec ts) {
return ts.tv_sec * NSEC_PER_SEC + ts.tv_nsec;
}
static void add_ns(struct timespec *t, int64_t ns) {
t->tv_nsec += ns;
while (t->tv_nsec >= ((int64_t)NSEC_PER_SEC)) {
t->tv_sec += 1;
t->tv_nsec -= NSEC_PER_SEC;
}
}
static int64_t calcdiff_ns(struct timespec t1, struct timespec t2) {
int64_t diff;
diff = NSEC_PER_SEC * ((int)t1.tv_sec - (int)t2.tv_sec);
diff += ((int)t1.tv_nsec - (int)t2.tv_nsec);
return diff;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment