Commit e46b2cfe authored by Joanne Hugé's avatar Joanne Hugé

Spectrum Analyzer can see signal in all 10MHz bandwidth (SISO)

parent b5aca5fd
/* DPDK */
#define MEMPOOL_CACHE_SIZE 256
#define RTE_TEST_RX_DESC_DEFAULT 1024
#define RTE_TEST_TX_DESC_DEFAULT 1024
static uint16_t nb_rxd = RTE_TEST_RX_DESC_DEFAULT;
static uint16_t nb_txd = RTE_TEST_TX_DESC_DEFAULT;
struct rte_mempool *tx_mbuf_pool;
struct rte_mempool *rx_mbuf_pool;
struct rte_ether_addr s_addr;
struct rte_ether_addr d_addr;
static const struct rte_eth_conf port_conf_default = {
.rxmode = { .max_lro_pkt_size = RTE_ETHER_MAX_LEN }
};
static inline int port_init(int portid, struct rte_mempool *rx_mbuf_pool) {
struct rte_eth_conf port_conf = port_conf_default;
const uint16_t rx_rings = 1, tx_rings = 1;
int retval;
uint16_t q;
retval = rte_eth_dev_configure(portid, rx_rings, tx_rings, &port_conf);
if (retval != 0)
return retval;
/* Allocate and set up 1 RX queue per Ethernet port. */
for (q = 0; q < rx_rings; q++) {
retval = rte_eth_rx_queue_setup(portid, q, nb_rxd,
rte_eth_dev_socket_id(portid), NULL, rx_mbuf_pool);
if (retval < 0)
return retval;
}
/* Allocate and set up 1 TX queue per Ethernet port. */
for (q = 0; q < tx_rings; q++) {
retval = rte_eth_tx_queue_setup(portid, q, nb_txd,
rte_eth_dev_socket_id(portid), NULL);
if (retval < 0)
return retval;
}
/* Start the Ethernet port. */
retval = rte_eth_dev_start(portid);
if (retval < 0)
return retval;
return 0;
}
static void init_dpdk(int argc, char ** argv) {
unsigned int nb_mbufs;
int ret;
ret = rte_eal_init(argc, argv);
if (ret < 0)
rte_exit(EXIT_FAILURE, "initlize fail!");
argc -= ret;
argv += ret;
nb_mbufs = RTE_MAX((nb_rxd + nb_txd + BURST_SIZE + MEMPOOL_CACHE_SIZE), 8192U);
nb_mbufs = 1024U * 16 - 1;
tx_mbuf_pool = rte_pktmbuf_pool_create("TX_MBUF_POOL", nb_mbufs,
MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, SOCKET_ID_ANY);
if (tx_mbuf_pool == NULL)
rte_exit(EXIT_FAILURE, "Cannot create tx mbuf pool\n");
rx_mbuf_pool = rte_pktmbuf_pool_create("RX_MBUF_POOL", nb_mbufs,
MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, SOCKET_ID_ANY);
if (rx_mbuf_pool == NULL)
rte_exit(EXIT_FAILURE, "Cannot create rx mbuf pool\n");
if (port_init(0, rx_mbuf_pool) != 0)
rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n", 0);
}
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <error.h>
#include <fcntl.h>
#include <getopt.h>
#include <immintrin.h>
#include <inttypes.h>
#include <limits.h>
#include <linux/if_packet.h>
#include <math.h>
#include <netdb.h>
#include <netinet/ether.h>
#include <netinet/in.h>
#include <net/if.h>
#include <pthread.h>
#include <sched.h>
#include <semaphore.h>
#include <signal.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include "private/trx_driver.h"
//#define DEBUG // Enables / deactivates log_debug
//#define TRACE
#define MONITOR
#define RECV_STOP_THRESHOLD 3
#define PPS_UPDATE_PERIOD INT64_C(1000000000)
#include "utils.c"
#define EFREQ 38400
#define STAT_FRAME_INTERVAL INT64_C(EFREQ * 150)
#define START_SENDING
#define RX_N_CHANNEL 1
#define TX_N_CHANNEL 4
#define FRAME_FREQ INT64_C(3840000) // Basic frame frequency
#define TX_PACKET_SIZE 262
#define RX_MAX_PACKET_SIZE 262
#define TX_ECPRI_PACKET_SIZE (TX_PACKET_SIZE - 14)
#define N_SAMPLES (32)
#define TRX_MAX_GROUP 1500
#define TRX_BUF_MAX_SIZE 500000
#define TXRX_BUF_MAX_SIZE 500000
#define STATISTIC_REFRESH_RATE INT64_C(500 * 1000 * 1000)
#define STAT_INT_LEN "9"
typedef struct {
float re;
float im;
} Complex;
typedef struct {
const uint8_t * re_mac;
const uint8_t * rec_mac;
const uint8_t * rec_if;
const char * dpdk_options;
const uint8_t * log_directory;
int recv_affinity;
int send_affinity;
int encode_affinity;
int decode_affinity;
int statistic_affinity;
int ecpri_period;
int flow_id;
int sample_rate;
} TRXEcpriState;
typedef struct {
int64_t counter;
int64_t pps_counter;
int64_t pps_ts;
int64_t pps;
} counter_stat_t;
typedef struct {
volatile void * buffer;
char name[64];
int buf_len;
int len;
volatile int write_index;
volatile int read_index;
volatile int write_ahead;
} ring_buffer_t;
typedef struct {
int64_t count;
uint8_t wait;
uint8_t zeroes;
} sample_group_t;
/* Proprietary code:
- compression / decompression of IQ samples
- fast conversion between int16_t and float
*/
#include "private/bf1_avx2.c"
// Buffers
static ring_buffer_t rx_rbuf; // Received packets
static ring_buffer_t trxr_rbuf[RX_N_CHANNEL]; // Decoded IQ samples
static ring_buffer_t tx_rbuf; // Packets to send
static ring_buffer_t trxw_rbuf[TX_N_CHANNEL]; // Uncompressed IQ samples
static ring_buffer_t trxw_group_rbuf; // Group of IQ samples
// Counters
static volatile counter_stat_t recv_counter; // frames received from eRE
static volatile counter_stat_t decode_counter; // decoded frames
static volatile counter_stat_t read_counter; // frames passed to amarisoft stack
static volatile counter_stat_t write_counter; // samples to write from TRX
static volatile counter_stat_t encode_counter; // encoded frames
static volatile counter_stat_t sent_counter; // frames sent to eRE
static volatile counter_stat_t rx_drop_counter; // frames sent to eRE
static volatile counter_stat_t tx_drop_counter; // frames sent to eRE
static volatile int sync_complete = 0;
static volatile int sync_happened = 0;
static int first_trx_write = 1;
#ifdef TRACE
static volatile int rx_trace_ready = 0;
static volatile int tx_trace_ready = 1;
static int tx_trace_index_start = 0;
#endif
// Network
static volatile int seq_id;
static void rbuf_update_write_index(ring_buffer_t * rbuf) {
rbuf->write_index = (rbuf->write_index + 1) % rbuf->buf_len;
}
static void rbuf_update_read_index(ring_buffer_t * rbuf) {
rbuf->read_index = (rbuf->read_index + 1) % rbuf->buf_len;
}
static int rbuf_read_amount(ring_buffer_t * rbuf) {
return (rbuf->write_index + rbuf->buf_len - rbuf->read_index) % rbuf->buf_len;
}
static int rbuf_write_amount(ring_buffer_t * rbuf) {
// Don't write everything to avoid write index catching up to read index
// That we way we don't have to use locks
return ((rbuf->read_index + rbuf->buf_len - rbuf->write_index - 1) % rbuf->buf_len);
}
static int rbuf_contiguous_copy(ring_buffer_t * rbuf1, ring_buffer_t * rbuf2, int n) {
int ret = n;
if(rbuf1) {
n = rbuf1->buf_len - rbuf1->read_index;
ret = n < ret ? n : ret;
}
if(rbuf2)
n = rbuf2->buf_len - rbuf2->write_index;
return n < ret ? n : ret;
}
#define RBUF_READ0(rbuf, type) (((type *) rbuf.buffer) + (rbuf.read_index * rbuf.len))
#define RBUF_WRITE0(rbuf, type) (((type *) rbuf.buffer) + (rbuf.write_index * rbuf.len))
#define RBUF_READ(rbuf, i, type) (((type *) rbuf.buffer) + (((rbuf.read_index + i) % rbuf.buf_len) * rbuf.len))
#define RBUF_WRITE(rbuf, i, type) (((type *) rbuf.buffer) + (((rbuf.write_index + i) % rbuf.buf_len) * rbuf.len))
#define RBUF_INIT(rbuf, _name, _buf_len, _len, type) do\
{\
log_debug("TRX_ECPRI", "Allocating %s with %d bytes\n", _name, (_buf_len * _len * sizeof(type)));\
rbuf.buffer = (type *) malloc(_buf_len * _len * sizeof(type));\
strcpy(rbuf.name, _name);\
rbuf.buf_len = _buf_len;\
rbuf.len = _len;\
rbuf.write_index = 0;\
rbuf.read_index = 0;\
rbuf.write_ahead = 0;\
} while(0)
static void print_stats(FILE * f, int print_header) {
if(print_header) {
fprintf(f,
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"\n",
"rx dropped",
"tx dropped",
"received",
"decode",
"read",
"write",
"encode",
"sent",
"received pps",
"decode pps",
"read pps",
"write pps",
"encode pps",
"sent pps");
}
fprintf(f,
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 "pps "
"%" STAT_INT_LEN "" PRIi64 "pps "
"%" STAT_INT_LEN "" PRIi64 "pps "
"%" STAT_INT_LEN "" PRIi64 "pps "
"%" STAT_INT_LEN "" PRIi64 "pps "
"%" STAT_INT_LEN "" PRIi64 "pps "
"\n",
rx_drop_counter.counter,
tx_drop_counter.counter,
recv_counter.counter,
decode_counter.counter,
read_counter.counter,
write_counter.counter,
encode_counter.counter,
sent_counter.counter,
recv_counter.pps,
decode_counter.pps,
read_counter.pps,
write_counter.pps,
encode_counter.pps,
sent_counter.pps);
}
static void log_exit(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " EXIT [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
fprintf(stderr, "%s\n", line);
// Dump useful information
print_stats(stderr, 1);
fprintf(stderr, "TX RBUF: ri %d wi %d ra %d wa %d\n", tx_rbuf.read_index, tx_rbuf.write_index, rbuf_read_amount(&tx_rbuf), rbuf_write_amount(&tx_rbuf));
fprintf(stderr, "RX RBUF: ri %d wi %d ra %d wa %d\n", rx_rbuf.read_index, rx_rbuf.write_index, rbuf_read_amount(&rx_rbuf), rbuf_write_amount(&rx_rbuf));
fprintf(stderr, "TRXW RBUF: ri %d wi %d ra %d wa %d\n", trxw_rbuf[0].read_index, trxw_rbuf[0].write_index, rbuf_read_amount(&trxw_rbuf[0]), rbuf_write_amount(&trxw_rbuf[0]));
fprintf(stderr, "TRXR RBUF: ri %d wi %d ra %d wa %d\n", trxr_rbuf[0].read_index, trxr_rbuf[0].write_index, rbuf_read_amount(&trxr_rbuf[0]), rbuf_write_amount(&trxr_rbuf[0]));
fprintf(stderr, "TRXW GROUP RBUF: ri %d wi %d ra %d wa %d\n", trxw_group_rbuf.read_index, trxw_group_rbuf.write_index, rbuf_read_amount(&trxw_group_rbuf), rbuf_write_amount(&trxw_group_rbuf));
fflush(stdout);
fflush(stderr);
exit(EXIT_FAILURE);
}
#define BURST_SIZE 16
#define TX_POOL_SIZE 16
int8_t tx_data[BURST_SIZE][TX_PACKET_SIZE];
struct rte_mbuf {
int buf_addr;
int data_off;
};
static void send_packets(int port) {
struct rte_mbuf * pkt[TX_POOL_SIZE];
struct rte_ether_hdr *eth_hdr;
uint16_t nb_tx = 0;
for(int i = 0; i < TX_POOL_SIZE; i++) {
int pkt_size;
pkt[i] = rte_pktmbuf_alloc(tx_mbuf_pool);
eth_hdr = rte_pktmbuf_mtod(pkt[i], struct rte_ether_hdr*);
if(port) {
eth_hdr->d_addr = s_addr;
eth_hdr->s_addr = d_addr;
} else {
eth_hdr->d_addr = d_addr;
eth_hdr->s_addr = s_addr;
}
eth_hdr->ether_type = htons(0xaefe);
memcpy(rte_pktmbuf_mtod_offset(pkt[i], uint8_t *, sizeof(struct rte_ether_hdr)), tx_data[i], TX_ECPRI_PACKET_SIZE);
pkt_size = TX_PACKET_SIZE;
pkt[i]->data_len = pkt_size;
pkt[i]->pkt_len = pkt_size;
}
while(nb_tx < TX_POOL_SIZE) {
int64_t x = TX_POOL_SIZE - nb_tx;
nb_tx += rte_eth_tx_burst(port, 0, pkt + nb_tx, x > BURST_SIZE ? BURST_SIZE : x);
}
/* Free any unsent packets. */
if (nb_tx < BURST_SIZE) {
uint16_t buf;
for (buf = nb_tx; buf < BURST_SIZE; buf++)
rte_pktmbuf_free(pkt[buf]);
log_exit("SEND_THREAD", "Sent %d packets instead of %d", nb_tx, BURST_SIZE);
}
}
static void init_counter(volatile counter_stat_t * c) {
c->counter = 0;
c->pps_counter = 0;
c->pps_ts = 0;
c->pps = 0;
}
static void update_counter_pps(volatile counter_stat_t * c) {
struct timespec _ts;
int64_t ts;
clock_gettime(CLOCK_TAI, &_ts);
ts = ts_to_int(_ts);
if((ts - c->pps_ts) > PPS_UPDATE_PERIOD) {
if(c->pps_ts)
c->pps = ((c->counter - c->pps_counter) * NSEC_PER_SEC) / (ts - c->pps_ts);
c->pps_counter = c->counter;
c->pps_ts = ts;
}
}
static void update_counter(volatile counter_stat_t * c, int64_t v) {
c->counter += v;
}
#ifdef TRACE
static void trace_handler(struct timespec initial, TRXEcpriState * s) {
struct timespec next;
if(tx_trace_ready && rx_trace_ready) {
int64_t d;
clock_gettime(CLOCK_TAI, &next);
d = calcdiff_ns(next, initial);
log_info("TRACE", "Packets sent: %" PRIi64, sent_counter.counter);
log_info("TRACE", "Duration: %" PRIi64, d);
log_info("TRACE", "FRAME_FREQ: %" PRIi64, FRAME_FREQ);
FILE * f;
char n[256];
uint8_t ones[14];
for(int i = 0; i < 14; i++)
ones[i] = 0xff;
memset(n, '\0', 256);
sprintf(n, "%s/tx.trace", s->log_directory);
f = fopen(n, "wb+");
log_info("TRACE", "Writing %d frames to tx.trace", tx_rbuf.write_index + tx_rbuf.buf_len - tx_trace_index_start);
for(int i = tx_trace_index_start; i != tx_rbuf.write_index; i = (i + 1) % tx_rbuf.buf_len) {
fwrite(ones, 14, 1, f);
fwrite(((uint8_t*) tx_rbuf.buffer) + i * tx_rbuf.len, tx_rbuf.len, 1, f);
}
fclose(f);
memset(n, '\0', 256);
sprintf(n, "%s/rx.trace", s->log_directory);
f = fopen(n, "wb+");
log_info("TRACE", "Writing %d frames to rx.trace", rx_rbuf.write_index);
for(int i = 0; i < rx_rbuf.write_index; i++) {
fwrite(((uint8_t*) rx_rbuf.buffer) + i * rx_rbuf.len, rx_rbuf.len, 1, f);
}
fclose(f);
memset(n, '\0', 256);
sprintf(n, "%s/trxw.trace", s->log_directory);
f = fopen(n, "wb+");
log_info("TRACE", "Writing %d frames to trxw.trace", trxw_rbuf[0].write_index);
for(int i = 0; i < trxw_rbuf[0].write_index; i++) {
for(int j = 0; j < TX_N_CHANNEL; j++)
fwrite((uint8_t *) (((Complex *) trxw_rbuf[j].buffer) + i * trxw_rbuf[0].len), trxw_rbuf[0].len * sizeof(Complex), 1, f);
}
fclose(f);
memset(n, '\0', 256);
sprintf(n, "%s/trxr.trace", s->log_directory);
f = fopen(n, "wb+");
log_info("TRACE", "Writing %d frames to trxr.trace", trxr_rbuf[0].write_index);
for(int i = 0; i < trxr_rbuf[0].write_index; i++) {
for(int j = 0; j < RX_N_CHANNEL; j++)
fwrite((uint8_t *) (((Complex *) trxr_rbuf[j].buffer) + i * trxr_rbuf[0].len), trxr_rbuf[0].len * sizeof(Complex), 1, f);
}
fclose(f);
log_exit("", "Finished tracing");
}
}
#endif
static void *recv_thread(void *p) {
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
int first_seq_id = 1;
log_info("RECV_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->recv_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->recv_affinity);
#define RTE_MBUF_SIZE 20000
#define MIN_RX 10000
for(;;) {
struct rte_mbuf * pkt[RTE_MBUF_SIZE];
uint8_t * buf, * rtebuf;
int port = 0;
int nb_rx = 0;
int n;
int drop_packet = 0;
while(!nb_rx)
nb_rx = recv_packets(port, 0, pkt + nb_rx, 1024);
n = rbuf_write_amount(&rx_rbuf);
drop_packet = nb_rx > n;
if(drop_packet) {
for(int i = 0; i < nb_rx; i++)
rte_pktmbuf_free(pkt[i]);
if(nb_rx)
update_counter(&rx_drop_counter, nb_rx);
}
else {
int nc; int nr;
nr = nb_rx;
while((nc = rbuf_contiguous_copy(NULL, &rx_rbuf, nr))) {
#ifdef TRACE
if((rx_rbuf.write_index + nc) >= rx_rbuf.buf_len) {
log_info("RECV_THREAD", "RX Trace ready");
rx_trace_ready = 1;
pthread_exit(EXIT_SUCCESS);
}
#endif
buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.write_index * rx_rbuf.len);
for(int i = 0; i < nc; i++) {
rtebuf = (uint8_t *) (pkt[i])->buf_addr + (pkt[i])->data_off;
if(first_seq_id) {
uint16_t seq_id = htons(((uint16_t *) (rtebuf + 20))[0]);
printf("seq_id = %d\n", seq_id);
first_seq_id = 0;
}
memcpy(buf + i * rx_rbuf.len, rtebuf, rx_rbuf.len);
}
rx_rbuf.write_index = (rx_rbuf.write_index + nc) % rx_rbuf.buf_len;
for(int i = 0; i < nc; i++)
rte_pktmbuf_free(pkt[i]);
nr -= nc;
}
}
update_counter(&recv_counter, nb_rx);
}
pthread_exit(EXIT_SUCCESS);
}
// Send as soon as packets are encoded
// Signal to encode thread that packets has been sent
static void *send_thread(void *p) {
cpu_set_t mask;
struct timespec initial;
TRXEcpriState * s = (TRXEcpriState *) p;
log_info("SEND_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->send_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->send_affinity);
clock_gettime(CLOCK_TAI, &initial);
for(int64_t i = 1;; i++) {
int64_t n = rbuf_read_amount(&tx_rbuf);
if(n >= BURST_SIZE) {
int nb_burst = n / BURST_SIZE;
for(int j = 0; j < nb_burst; j++) {
for(int k = 0; k < BURST_SIZE; k++) {
memcpy(tx_data[k], RBUF_READ0(tx_rbuf, uint8_t), tx_rbuf.len);
rbuf_update_read_index(&tx_rbuf);
}
send_packets(0);
}
update_counter(&sent_counter, nb_burst * BURST_SIZE);
}
}
pthread_exit(EXIT_SUCCESS);
}
/*
If sync has happenned (=we have received frames):
Prepare as soon as TRX has packet to write
Signal
Else:
Prepare as soon as there is space in tx buffer
*/
#define TX_SYNC_BURST_SIZE 512
static void *encode_thread(void *p) {
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
struct timespec next;
int64_t target_counter = 0;
int reset_encode_counter = 1;
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->encode_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->encode_affinity);
for(int64_t i = 0;; i++) {
int n;
if(sync_complete && reset_encode_counter) {
encode_counter.counter = 0;
reset_encode_counter = 0;
seq_id = 0;
#ifdef TRACE
tx_trace_index_start = tx_rbuf.write_index;
#endif
}
// If we have frames to encode (is there space in TX buffer)
n = rbuf_write_amount(&tx_rbuf);
if(n) {
// If there are frames from trx_write callback to encode
if(rbuf_read_amount(&trxw_rbuf[0]) && rbuf_read_amount(&trxw_group_rbuf)) {
sample_group_t * g; int nb_frames;
g = RBUF_READ0(trxw_group_rbuf, sample_group_t);
if(g->wait) {
g->wait = 0;
g->count -= encode_counter.counter;
g->zeroes = 1;
}
nb_frames = g->count > n ? n : g->count;
g->count -= nb_frames;
#ifdef TRACE
if((encode_counter.counter + nb_frames) >= tx_rbuf.buf_len) {
log_info("ENCODE_THREAD", "TX Trace ready");
tx_trace_ready = 1;
pthread_exit(EXIT_SUCCESS);
}
#endif
if(g->zeroes) {
for(int j = 0; j < nb_frames; j++) {
memset(RBUF_WRITE0(tx_rbuf, uint8_t) + 8, 0x00, 240);
*((uint16_t *) (RBUF_WRITE0(tx_rbuf, uint8_t) + 6)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
}
trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + nb_frames) % trxw_rbuf[0].buf_len;
} else {
int nc;
int nf = nb_frames;
while((nc = rbuf_contiguous_copy(&trxw_rbuf[0], &tx_rbuf, nf))) {
Complex * iq_samples[4];
uint8_t * buf = RBUF_WRITE0(tx_rbuf, uint8_t) + 8;
for(int j = 0; j < TX_N_CHANNEL; j++)
iq_samples[j] = ((Complex *) trxw_rbuf[j].buffer) + (trxw_rbuf[0].read_index * trxw_rbuf[0].len);
for(int i = 0; i < nc; i++) {
for(int i = 0; i < TX_N_CHANNEL ; i++)
encode_s64_b60_2(buf + i * 60, (float *) iq_samples[i]);
*((uint16_t *)(buf - 2)) = htons(seq_id++);
for(int j = 0; j < TX_N_CHANNEL; j++)
iq_samples[j] += trxw_rbuf[0].len;
buf += tx_rbuf.len;
}
tx_rbuf.write_index = (tx_rbuf.write_index + nc) % tx_rbuf.buf_len;
trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + nc) % trxw_rbuf[0].buf_len;
nf -= nc;
}
if(nf)
exit(EXIT_FAILURE);
}
update_counter(&encode_counter, nb_frames);
if(!g->count) {
rbuf_update_read_index(&trxw_group_rbuf);
}
}
else {
// Send empty frames until we receive something
#ifdef START_SENDING
if(!sync_complete) {
if(i == 0)
clock_gettime(CLOCK_TAI, &next);
// Limit packets sent
if(encode_counter.counter > target_counter) {
int k = (encode_counter.counter - target_counter + EFREQ - 1) / EFREQ;
add_ns(&next, k * 1000 * 1000 * 10); // 10ms to send 38400 packets
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
target_counter += k * EFREQ;
}
n = (n > TX_SYNC_BURST_SIZE) ? n : TX_SYNC_BURST_SIZE;
n = (n < EFREQ) ? n : EFREQ;
for(int j = 0; j < n; j++) {
*((uint16_t *) (RBUF_WRITE0(tx_rbuf, uint8_t) + 6)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
}
update_counter(&encode_counter, n);
}
#endif
}
}
}
pthread_exit(EXIT_SUCCESS);
}
static void *decode_thread(void *p) {
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
log_info("DECODE_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->decode_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->decode_affinity);
for(;;) {
int n;
while(!(n = rbuf_read_amount(&rx_rbuf)));
while(rbuf_write_amount(&trxr_rbuf[0]) < n);
int nc;
while((nc = rbuf_contiguous_copy(&rx_rbuf, &trxr_rbuf[0], n))) {
uint8_t * buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.read_index * rx_rbuf.len) + 22;
#ifdef TRACE
if((trxr_rbuf[0].write_index + nc) >= trxr_rbuf[0].buf_len) {
rx_trace_ready = 1;
log_info("DECODE_THREAD", "RX Trace ready");
pthread_exit(EXIT_SUCCESS);
}
#endif
Complex * iq_samples[4];
for(int i = 0; i < RX_N_CHANNEL; i++)
iq_samples[i] = (((Complex *) trxr_rbuf[i].buffer) + (trxr_rbuf[0].write_index * trxr_rbuf[0].len));
for(int i = 0; i < nc; i++) {
for(int j = 0; j < RX_N_CHANNEL ; j++) {
decode_s64_b60_2((float *) (iq_samples[j] + i * 32), buf + j * 60 + i * rx_rbuf.len);
}
}
trxr_rbuf[0].write_index = (trxr_rbuf[0].write_index + nc) % trxr_rbuf[0].buf_len;
rx_rbuf.read_index = (rx_rbuf.read_index + nc) % rx_rbuf.buf_len;
n -= nc;
update_counter(&decode_counter, nc);
}
}
pthread_exit(EXIT_SUCCESS);
}
static void *statistic_thread(void *p) {
struct timespec next, initial;
int64_t recv_stop = 0;
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
FILE * stats_file_desc;
log_info("STATISTIC_THREAD", "Thread init");
char stats_file_name[256];
memset(stats_file_name, '\0', 256);
sprintf(stats_file_name, "%s/ecpri.stats", s->log_directory);
stats_file_desc = fopen(stats_file_name, "w+");
if(!stats_file_desc)
error(EXIT_FAILURE, errno, "Couldn't open %s\n", stats_file_name);
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->statistic_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->statistic_affinity);
clock_gettime(CLOCK_TAI, &initial);
next = initial;
for(int64_t i = 0;; i++) {
add_ns(&next, STATISTIC_REFRESH_RATE);
#ifdef TRACE
trace_handler(initial, s);
#endif
print_stats(stats_file_desc, (i % 50) == 0);
#ifdef DEBUG
fprintf(stats_file_desc,
"%d %d %d %d %d %d %d %d\n",
rx_rbuf.write_index,
rx_rbuf.read_index,
trxr_rbuf[0].write_index,
trxr_rbuf[0].read_index,
trxw_rbuf[0].write_index,
trxw_rbuf[0].read_index,
tx_rbuf.write_index,
tx_rbuf.read_index);
fprintf(stats_file_desc, "TRXW RBUF: ri %d wi %d ra %d wa %d\n", trxw_rbuf[0].read_index, trxw_rbuf[0].write_index, rbuf_read_amount(&trxw_rbuf[0]), rbuf_write_amount(&trxw_rbuf[0]));
#endif
fflush(stats_file_desc);
update_counter_pps(&rx_drop_counter);
update_counter_pps(&tx_drop_counter);
update_counter_pps(&recv_counter);
update_counter_pps(&decode_counter);
update_counter_pps(&read_counter);
update_counter_pps(&write_counter);
update_counter_pps(&encode_counter);
update_counter_pps(&sent_counter);
#ifdef MONITOR
if(recv_counter.pps < 3000000) {
struct timespec _ts;
int64_t ts;
clock_gettime(CLOCK_MONOTONIC, &_ts);
ts = ts_to_int(_ts);
if(sync_happened && (recv_stop && ((ts - recv_stop) > RECV_STOP_THRESHOLD * INT64_C(1000000000)))) {
log_info("MONITOR", "Stopped recieving packets, sending again...");
sync_complete = 0;
recv_stop = 0;
}
if(!recv_stop)
recv_stop = ts;
}
#endif
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
}
pthread_exit(EXIT_SUCCESS);
}
static int start_threads(TRXEcpriState * s) {
pthread_t recv_pthread;
pthread_t send_pthread;
pthread_t encode_pthread;
pthread_t decode_pthread;
pthread_t statistic_pthread;
struct sched_param recv_param;
struct sched_param send_param;
struct sched_param encode_param;
struct sched_param decode_param;
struct sched_param statistic_param;
pthread_attr_t recv_attr;
pthread_attr_t send_attr;
pthread_attr_t encode_attr;
pthread_attr_t decode_attr;
pthread_attr_t statistic_attr;
log_info("TRX_ECPRI", "Starting threads");
// Initialize pthread attributes (default values)
if (pthread_attr_init(&recv_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
// Set a specific stack size
if (pthread_attr_setstacksize(&recv_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
// Set scheduler policy and priority of pthread
if (pthread_attr_setschedpolicy(&recv_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
recv_param.sched_priority = 97;
if (pthread_attr_setschedparam(&recv_attr, &recv_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
/* Use scheduling parameters of attr */
if (pthread_attr_setinheritsched(&recv_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&send_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&send_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&send_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
send_param.sched_priority = 97;
if (pthread_attr_setschedparam(&send_attr, &send_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&send_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&encode_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&encode_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&encode_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
encode_param.sched_priority = 97;
if (pthread_attr_setschedparam(&encode_attr, &encode_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&encode_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&decode_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&decode_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&decode_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
decode_param.sched_priority = 97;
if (pthread_attr_setschedparam(&decode_attr, &decode_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&decode_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&statistic_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&statistic_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&statistic_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
statistic_param.sched_priority = 97;
if (pthread_attr_setschedparam(&statistic_attr, &statistic_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&statistic_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_create(&statistic_pthread, NULL, statistic_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create statistic thread");
usleep(1000 * 20);
if (pthread_create(&encode_pthread, NULL, encode_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create encode thread");
usleep(1000 * 20);
if (pthread_create(&decode_pthread, NULL, decode_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create decode thread");
usleep(1000 * 20);
if (pthread_create(&send_pthread, NULL, send_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create send thread");
usleep(1000 * 500);
if (pthread_create(&recv_pthread, NULL, recv_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create recv thread");
return 0;
}
int startdpdk(TRXEcpriState * s) {
uint8_t ecpri_message[TX_ECPRI_PACKET_SIZE];
int argc = 1;
int k = 1;
int prev_space = -1;
char ** argv;
for(int i = 0;; i++) {
if(s->dpdk_options[i] == ' ')
argc++;
else if(s->dpdk_options[i] == '\0')
break;
}
argv = (char **) malloc(sizeof(char *) * argc);
for(int i = 0;; i++) {
if(s->dpdk_options[i] == ' ') {
argv[k] = (char *) malloc(i - prev_space);
strncpy(argv[k], s->dpdk_options + prev_space + 1, i - prev_space -1);
argv[k][i - prev_space-1] = '\0';
prev_space = i;
k++;
}
else if(s->dpdk_options[i] == '\0') {
break;
}
}
argv[0] = "";
log_info("TRX_ECPRI", "Start");
//set_latency_target();
seq_id = 0;
init_counter(&rx_drop_counter);
init_counter(&tx_drop_counter);
init_counter(&recv_counter);
init_counter(&decode_counter);
init_counter(&read_counter);
init_counter(&write_counter);
init_counter(&encode_counter);
init_counter(&sent_counter);
RBUF_INIT(rx_rbuf, "RX ring buffer", TXRX_BUF_MAX_SIZE, RX_MAX_PACKET_SIZE, uint8_t);
RBUF_INIT(tx_rbuf, "TX ring buffer", TXRX_BUF_MAX_SIZE, TX_ECPRI_PACKET_SIZE, uint8_t);
for(int i = 0; i < TX_N_CHANNEL; i++) {
char s[256];
sprintf(s, "TRXWrite Ring Buffer %d", i);
RBUF_INIT(trxw_rbuf[i], s, TRX_BUF_MAX_SIZE, N_SAMPLES, Complex);
}
for(int i = 0; i < RX_N_CHANNEL; i++) {
char s[256];
sprintf(s, "TRXRead Ring Buffer %d", i);
RBUF_INIT(trxr_rbuf[i], s, TRX_BUF_MAX_SIZE, N_SAMPLES, Complex);
}
RBUF_INIT(trxw_group_rbuf, "TRXGroupWrite ring buffer", TRX_MAX_GROUP, 1, sample_group_t);
memset((uint8_t *) ecpri_message, 0, TX_ECPRI_PACKET_SIZE);
#ifdef DPDK
if(sscanf((char *) s->re_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c",
&d_addr.addr_bytes[0],
&d_addr.addr_bytes[1],
&d_addr.addr_bytes[2],
&d_addr.addr_bytes[3],
&d_addr.addr_bytes[4],
&d_addr.addr_bytes[5]) != 6)
fprintf(stderr, "Invalid eRE MAC address\n");
if(sscanf((char *) s->rec_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c",
&s_addr.addr_bytes[0],
&s_addr.addr_bytes[1],
&s_addr.addr_bytes[2],
&s_addr.addr_bytes[3],
&s_addr.addr_bytes[4],
&s_addr.addr_bytes[5]) != 6)
fprintf(stderr, "Invalid eREC MAC address\n");
#endif
/* Standard Header */
ecpri_message[0] = 0x10; // Protocol data revision 0x1, C = 0
// Message type = 0x00, IQ data
// Payload size
*((uint16_t *) (ecpri_message + 2)) = htons(244);
*((uint16_t *) (ecpri_message + 4)) = htons(s->flow_id);
for(int i = 0; i < tx_rbuf.buf_len; i++)
memcpy(((uint8_t *) tx_rbuf.buffer) + (i * tx_rbuf.len), ecpri_message, tx_rbuf.len);
start_threads(s);
return 0;
}
static void trx_ecpri_end(TRXState *s1)
{
log_info("TRX_ECPRI", "End");
TRXEcpriState *s = s1->opaque;
free(s);
}
int64_t prev_ts = 0;
int64_t prev_count = 0;
#define M 32
static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void **__samples, int count, int tx_port_index, TRXWriteMetadata *md)
{
(void) s1;
int write_count; int64_t ts; sample_group_t * g; int nc;
float ** _samples = (float **) __samples;
write_count = count / M;
ts = timestamp / M;
log_debug("TRX_ECPRI_WRITE", "trx_ecpri_write, count = %ld", count);
if(prev_count && ((ts - prev_ts) != prev_count)) {
log_exit("TRX_ECPRI_WRITE", "Gap between timestamps: prev_ts %li ts %li prev_count %li count %li diff_ts %li", prev_ts, ts, prev_count, count, (ts - prev_ts));
}
prev_ts = ts; prev_count = write_count;
if(write_count > rbuf_write_amount(&trxw_rbuf[0])) {
//log_exit("TRX_ECPRI_WRITE", "Not enough space to write in trxw_rbuf (write count = %d)", write_count);
update_counter(&tx_drop_counter, write_count);
return;
}
#ifdef TRACE
if((trxw_rbuf[0].write_index + write_count) >= trxw_rbuf[0].buf_len) {
log_info("TRX_ECPRI_WRITE", "TX Trace ready");
tx_trace_ready = 1;
pthread_exit(EXIT_SUCCESS);
}
#endif
if(first_trx_write) {
sample_group_t * g2 = RBUF_WRITE0(trxw_group_rbuf, sample_group_t);
g2->count = ts;
g2->wait = 1;
g2->zeroes = 1;
rbuf_update_write_index(&trxw_group_rbuf);
}
g = RBUF_WRITE0(trxw_group_rbuf, sample_group_t);
g->zeroes = __samples ? 0 : 1;
g->wait = 0;
g->count = write_count;
while((nc = rbuf_contiguous_copy(NULL, &trxw_rbuf[0], write_count))) {
if(__samples)
for(int i = 0; i < TX_N_CHANNEL; i++)
memcpy(((uint8_t *) trxw_rbuf[i].buffer) + trxw_rbuf[0].write_index * trxw_rbuf[0].len * sizeof(Complex), (uint8_t*) _samples[i], nc * trxw_rbuf[0].len * sizeof(Complex));
trxw_rbuf[0].write_index = (trxw_rbuf[0].write_index + nc) % trxw_rbuf[0].buf_len;
write_count -= nc;
}
rbuf_update_write_index(&trxw_group_rbuf);
update_counter(&write_counter, count / M);
}
static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__samples, int count, int rx_port_index, TRXReadMetadata *md)
{
(void) s1;
int nc; int n;
float ** _samples = (float **) __samples;
int read_count = (count / M);
int offset = 0;
while(rbuf_read_amount(&trxr_rbuf[0]) < read_count);
log_debug("TRX_ECPRI_READ", "count = %ld (%li)", read_count, read_counter.counter);
sync_complete = 1;
sync_happened = 1;
n = read_count;
while((nc = rbuf_contiguous_copy(&trxr_rbuf[0], NULL, n))) {
int len = nc * trxr_rbuf[0].len * sizeof(Complex);
for(int i = 0; i < RX_N_CHANNEL; i++ ) {
memcpy((uint8_t*) (_samples[i] + offset), ((uint8_t *) trxr_rbuf[i].buffer) + trxr_rbuf[0].read_index * trxr_rbuf[0].len * sizeof(Complex), len);
}
trxr_rbuf[0].read_index = (trxr_rbuf[0].read_index + nc) % trxr_rbuf[0].buf_len;
n -= nc;
offset += len;
}
*ptimestamp = recv_counter.counter * M;
update_counter(&read_counter, read_count);
return count;
}
/* This function can be used to automatically set the sample
rate. Here we don't implement it, so the user has to force a given
sample rate with the "sample_rate" configuration option */
static int trx_ecpri_get_sample_rate(TRXState *s1, TRXFraction *psample_rate,
int *psample_rate_num, int sample_rate_min)
{
return -1;
}
static int trx_ecpri_start(TRXState *s1, const TRXDriverParams *params)
{
TRXEcpriState *s = s1->opaque;
log_info("TRX_ECPRI_START", "Start");
log_info("TRX_ECPRI_START", "trx_api_version: %d", s1->trx_api_version);
log_info("TRX_ECPRI_START", "config file: %s", s1->path);
log_info("TEST-DPDK-ECPRI", "rec-mac: %s, re-mac: %s, rec-if: %s", s->rec_mac, s->re_mac, s->rec_if);
s->sample_rate = params->sample_rate[0].num / params->sample_rate[0].den;
startdpdk(s);
return 0;
}
void dummy_enb_init(TRXState *s1, TRXEcpriState *s) {
s1->trx_write_func2 = trx_ecpri_write;
s1->trx_read_func2 = trx_ecpri_read;
startdpdk(s);
}
int trx_driver_init(TRXState *s1)
{
TRXEcpriState *s;
double val;
// Lock all current and future pages from preventing of being paged to swap
if (mlockall(MCL_CURRENT | MCL_FUTURE)) {
log_error("TRX_ECPRI", "mlockall failed");
}
log_info("TRX_ECPRI", "Init");
if (s1->trx_api_version != TRX_API_VERSION) {
fprintf(stderr, "ABI compatibility mismatch between LTEENB and TRX driver (LTEENB ABI version=%d, TRX driver ABI version=%d)\n",
s1->trx_api_version, TRX_API_VERSION);
return -1;
}
s = malloc(sizeof(TRXEcpriState));
memset(s, 0, sizeof(*s));
trx_get_param_double(s1, &val, "recv_affinity");
s->recv_affinity = (int) val;
trx_get_param_double(s1, &val, "send_affinity");
s->send_affinity = (int) val;
trx_get_param_double(s1, &val, "encode_affinity");
s->encode_affinity = (int) val;
trx_get_param_double(s1, &val, "decode_affinity");
s->decode_affinity = (int) val;
trx_get_param_double(s1, &val, "statistic_affinity");
s->statistic_affinity = (int) val;
trx_get_param_double(s1, &val, "flow_id");
s->flow_id = (int) val;
trx_get_param_double(s1, &val, "ecpri_period");
if(((int) val) == 0) {
fprintf(stderr, "ecpri_period parameter can't be null\n");
return -1;
}
s->ecpri_period = (int) val;
s->re_mac = (uint8_t *) trx_get_param_string(s1, "re_mac");
s->rec_mac = (uint8_t *) trx_get_param_string(s1, "rec_mac");
s->rec_if = (uint8_t *) trx_get_param_string(s1, "rec_if");
s->dpdk_options = trx_get_param_string(s1, "dpdk_options");
s->log_directory = (uint8_t *) trx_get_param_string(s1, "log_directory");
s1->opaque = s;
s1->trx_end_func = trx_ecpri_end;
s1->trx_write_func2 = trx_ecpri_write;
s1->trx_read_func2 = trx_ecpri_read;
s1->trx_start_func = trx_ecpri_start;
s1->trx_get_sample_rate_func = trx_ecpri_get_sample_rate;
return 0;
}
......@@ -33,6 +33,10 @@
#include <time.h>
#include <unistd.h>
#include "private/trx_driver.h"
/* Proprietary code:
- fast conversion between int16_t and float
*/
#include "private/convert16_sse.c"
#define DEBUG
//#define DISABLE_SEND
......@@ -216,6 +220,11 @@ typedef struct {
uint8_t gps_time[10];
} ecpri_timing_packet;
static TRXEcpriState s_recv;
static TRXEcpriState s_recv2;
static TRXEcpriState s_recv3;
static TRXEcpriState s_recv4;
// Buffers
static ring_buffer_t trxr_rbuf[MAX_CHANNELS]; // Decoded IQ samples
static ring_buffer_t trxr2_rbuf[MAX_CHANNELS]; // Decoded IQ samples
......@@ -228,10 +237,11 @@ static uint8_t rx3_buf[MAX_PACKET_SIZE * MAX_RX_BURST];
static uint8_t rx4_buf[MAX_PACKET_SIZE * MAX_RX_BURST];
static uint8_t tx_buf[MAX_PACKET_SIZE * MAX_TX_BURST];
static TRXEcpriState s_recv;
static TRXEcpriState s_recv2;
static TRXEcpriState s_recv3;
static TRXEcpriState s_recv4;
#define TAB_SIZE 65536
float *trx_read_float_tab;
float *trx_write_float_tab;
int16_t *trx_read_int_tab;
int16_t *trx_write_int_tab;
// Counters
static counter_stat_t recv_counter[MAX_CHANNELS]; // IQs received from RRH
......@@ -1037,6 +1047,11 @@ int start(TRXEcpriState * s) {
//set_latency_target();
posix_memalign((void **)&trx_read_float_tab, 16, sizeof(float) * TAB_SIZE);
posix_memalign((void **)&trx_write_float_tab, 16, sizeof(float) * TAB_SIZE);
posix_memalign((void **)&trx_read_int_tab, 16, sizeof(int16_t) * TAB_SIZE);
posix_memalign((void **)&trx_write_int_tab, 16, sizeof(int16_t) * TAB_SIZE);
init_counter(&rx_drop_counter);
init_counter(&tx_drop_counter);
for(int i = 0; i < MAX_CHANNELS; i++) {
......@@ -1196,6 +1211,26 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
while(1)
usleep(1000);
#endif
/* Check for empty samples
int k = 0;
for(int j = 0; j < count; j++) {
if(_samples[0][j] == 0)
k++;
else {
k = 0;
break;
}
if(k > 20)
break;
}
if(k > 20) {
printf("samples = \n");
for(int j = 0; j < count / 2; j++)
printf("%f ", _samples[0][j]);
printf("\n");
log_exit("TRX_ECPRI_WRITE", "Null data");
}
*/
//log_debug("TRX_ECPRI_WRITE", "trx_ecpri_write, count = %ld", count / N_SAMPLES);
......@@ -1217,8 +1252,11 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
offset = 0;
count_left = count;
while((nc = rbuf_contiguous_copy(NULL, &trxw_rbuf[i], count_left * 4))) {
if(__samples)
memcpy(rbuf_write(&trxw_rbuf[i]), ((uint8_t *) _samples[i]) + offset, nc);
if(__samples) {
memcpy((uint8_t *) trx_write_float_tab, ((uint8_t *) _samples[i]) + offset, nc * 2);
float_to_int16(trx_write_int_tab, trx_write_float_tab, nc / 2, 32767);
memcpy(rbuf_write(&trxw_rbuf[i]), (uint8_t *) trx_write_int_tab, nc);
}
else
log_exit("TRX_ECPRI_WRITE", "samples empty during FDD mode");
rbuf_increment_write(&trxw_rbuf[i], nc);
......@@ -1229,6 +1267,8 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
update_counter(&write_counter, (s->tx_n_channel * count));
}
float mult = 1. / 32767.;
static int read_trxr(ring_buffer_t * rbuf, float * samples, int count_left, int offset) {
int to_read = 0;
int read = 0;
......@@ -1242,7 +1282,9 @@ static int read_trxr(ring_buffer_t * rbuf, float * samples, int count_left, int
to_read = count_left * 4;
while((nc = rbuf_contiguous_copy(rbuf, NULL, to_read))) {
memcpy(((uint8_t*) samples) + offset + read, rbuf_read(rbuf), nc);
memcpy((uint8_t*) trx_read_int_tab, rbuf_read(rbuf), nc);
int16_to_float(trx_read_float_tab, trx_read_int_tab, nc / 2, mult);
memcpy(((uint8_t*) samples) + offset + read, (uint8_t*) trx_read_float_tab, nc * 2);
rbuf_increment_read(rbuf, nc);
to_read -= nc;
read += nc;
......@@ -1311,6 +1353,7 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa
static int trx_ecpri_get_sample_rate(TRXState *s1, TRXFraction *psample_rate,
int *psample_rate_num, int sample_rate_min)
{
log_info("TRX_ECPRI_GET_SAMPLE_RATE", "");
return -1;
}
......
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <error.h>
#include <fcntl.h>
#include <getopt.h>
#include <immintrin.h>
#include <inttypes.h>
#include <limits.h>
#include <linux/if_packet.h>
#include <math.h>
#include <netdb.h>
#include <netinet/ether.h>
#include <netinet/in.h>
#include <net/if.h>
#include <pthread.h>
#include <sched.h>
#include <semaphore.h>
#include <signal.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#define DPDK
#ifdef DPDK
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_ether.h>
#include <rte_cycles.h>
#include <rte_lcore.h>
#include <rte_mbuf.h>
#include <rte_ether.h>
#include <rte_ip.h>
#include <rte_udp.h>
#endif
#include "private/trx_driver.h"
#include "utils.c"
//#define DEBUG // Enables / deactivates log_debug
#define PPS_UPDATE_PERIOD INT64_C(1000000000)
//#define DST_ADDR_SYNTAX // Depends on DPDK version
#define TX_PACKET_SIZE 262
#define RX_MAX_PACKET_SIZE 262
#define TX_ECPRI_PACKET_SIZE (TX_PACKET_SIZE - 14)
#define N_SAMPLES (32)
#define TRX_MAX_GROUP 1500
#define STAT_INT_LEN "9"
typedef struct {
float re;
float im;
} Complex;
typedef struct {
const uint8_t * re_mac;
const uint8_t * rec_mac;
const uint8_t * rec_if;
const char * dpdk_options;
const uint8_t * log_directory;
int recv_affinity;
int send_affinity;
int encode_affinity;
int decode_affinity;
int statistic_affinity;
int ecpri_period;
int flow_id;
int frame_frequency;
int trx_buf_size;
int txrx_buf_size;
int trace_rx;
int trace_tx;
int trace_offset;
int monitor_pps;
int monitor_trigger_duration;
int start_sending;
int start_receiving;
int rx_n_channel;
int tx_n_channel;
int statistics_refresh_rate_ns;
int sample_rate;
} TRXEcpriState;
typedef struct {
int64_t counter;
int64_t pps_counter;
int64_t pps_ts;
int64_t pps;
} counter_stat_t;
typedef struct {
volatile void * buffer;
char name[64];
int buf_len;
int len;
volatile int write_index;
volatile int read_index;
volatile int write_ahead;
} ring_buffer_t;
typedef struct {
int64_t count;
uint8_t wait;
uint8_t zeroes;
} sample_group_t;
/* Proprietary code:
- compression / decompression of IQ samples
- fast conversion between int16_t and float
*/
#include "private/bf1_avx2.c"
//#include "private/bf1_avx2_nop.c"
// Buffers
static ring_buffer_t rx_rbuf; // Received packets
static ring_buffer_t trxr_rbuf[4]; // Decoded IQ samples
static ring_buffer_t tx_rbuf; // Packets to send
static ring_buffer_t trxw_rbuf[4]; // Uncompressed IQ samples
static ring_buffer_t trxw_group_rbuf; // Group of IQ samples
// Counters
static volatile counter_stat_t recv_counter; // frames received from eRE
static volatile counter_stat_t decode_counter; // decoded frames
static volatile counter_stat_t read_counter; // frames passed to amarisoft stack
static volatile counter_stat_t write_counter; // samples to write from TRX
static volatile counter_stat_t encode_counter; // encoded frames
static volatile counter_stat_t sent_counter; // frames sent to eRE
static volatile counter_stat_t rx_drop_counter; // frames sent to eRE
static volatile counter_stat_t tx_drop_counter; // frames sent to eRE
static volatile int sync_complete = 0;
static volatile int received_pkts = 0;
static volatile int recv_pps_threshold_hit = 0;
static int first_trx_write = 1;
#ifndef DPDK
static uint8_t pkt_frame_full[1024];
#endif
static volatile int rx_trace_ready = 0;
static volatile int tx_trace_ready = 0;
static int64_t encode_counter_prev = 0;
static int64_t decode_counter_prev = 0;
static int recv_pps_threshold;
// Network
static volatile int seq_id;
static void rbuf_update_write_index(ring_buffer_t * rbuf) {
rbuf->write_index = (rbuf->write_index + 1) % rbuf->buf_len;
}
static void rbuf_update_read_index(ring_buffer_t * rbuf) {
rbuf->read_index = (rbuf->read_index + 1) % rbuf->buf_len;
}
static int rbuf_read_amount(ring_buffer_t * rbuf) {
return (rbuf->write_index + rbuf->buf_len - rbuf->read_index) % rbuf->buf_len;
}
static int rbuf_write_amount(ring_buffer_t * rbuf) {
// Don't write everything to avoid write index catching up to read index
// That we way we don't have to use locks
return ((rbuf->read_index + rbuf->buf_len - rbuf->write_index - 1) % rbuf->buf_len);
}
static int rbuf_contiguous_copy(ring_buffer_t * rbuf1, ring_buffer_t * rbuf2, int n) {
int ret = n;
if(rbuf1) {
n = rbuf1->buf_len - rbuf1->read_index;
ret = n < ret ? n : ret;
}
if(rbuf2)
n = rbuf2->buf_len - rbuf2->write_index;
return n < ret ? n : ret;
}
#define RBUF_READ0(rbuf, type) (((type *) rbuf.buffer) + (rbuf.read_index * rbuf.len))
#define RBUF_WRITE0(rbuf, type) (((type *) rbuf.buffer) + (rbuf.write_index * rbuf.len))
#define RBUF_READ(rbuf, i, type) (((type *) rbuf.buffer) + (((rbuf.read_index + i) % rbuf.buf_len) * rbuf.len))
#define RBUF_WRITE(rbuf, i, type) (((type *) rbuf.buffer) + (((rbuf.write_index + i) % rbuf.buf_len) * rbuf.len))
#define RBUF_INIT(rbuf, _name, _buf_len, _len, type) do\
{\
log_debug("TRX_ECPRI", "Allocating %s with %d bytes\n", _name, (_buf_len * _len * sizeof(type)));\
rbuf.buffer = (type *) malloc(_buf_len * _len * sizeof(type));\
strcpy(rbuf.name, _name);\
rbuf.buf_len = _buf_len;\
rbuf.len = _len;\
rbuf.write_index = 0;\
rbuf.read_index = 0;\
rbuf.write_ahead = 0;\
} while(0)
static void print_stats(FILE * f, int print_header) {
if(print_header) {
fprintf(f,
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"\n",
"rx dropped",
"tx dropped",
"received",
"decode",
"read",
"write",
"encode",
"sent",
"received pps",
"decode pps",
"read pps",
"write pps",
"encode pps",
"sent pps");
}
fprintf(f,
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 "pps "
"%" STAT_INT_LEN "" PRIi64 "pps "
"%" STAT_INT_LEN "" PRIi64 "pps "
"%" STAT_INT_LEN "" PRIi64 "pps "
"%" STAT_INT_LEN "" PRIi64 "pps "
"%" STAT_INT_LEN "" PRIi64 "pps "
"\n",
rx_drop_counter.counter,
tx_drop_counter.counter,
recv_counter.counter,
decode_counter.counter,
read_counter.counter,
write_counter.counter,
encode_counter.counter,
sent_counter.counter,
recv_counter.pps,
decode_counter.pps,
read_counter.pps,
write_counter.pps,
encode_counter.pps,
sent_counter.pps);
}
static void log_exit(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " EXIT [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
fprintf(stderr, "%s\n", line);
// Dump useful information
print_stats(stderr, 1);
fprintf(stderr, "TX RBUF: ri %d wi %d ra %d wa %d\n", tx_rbuf.read_index, tx_rbuf.write_index, rbuf_read_amount(&tx_rbuf), rbuf_write_amount(&tx_rbuf));
fprintf(stderr, "RX RBUF: ri %d wi %d ra %d wa %d\n", rx_rbuf.read_index, rx_rbuf.write_index, rbuf_read_amount(&rx_rbuf), rbuf_write_amount(&rx_rbuf));
fprintf(stderr, "TRXW RBUF: ri %d wi %d ra %d wa %d\n", trxw_rbuf[0].read_index, trxw_rbuf[0].write_index, rbuf_read_amount(&trxw_rbuf[0]), rbuf_write_amount(&trxw_rbuf[0]));
fprintf(stderr, "TRXR RBUF: ri %d wi %d ra %d wa %d\n", trxr_rbuf[0].read_index, trxr_rbuf[0].write_index, rbuf_read_amount(&trxr_rbuf[0]), rbuf_write_amount(&trxr_rbuf[0]));
fprintf(stderr, "TRXW GROUP RBUF: ri %d wi %d ra %d wa %d\n", trxw_group_rbuf.read_index, trxw_group_rbuf.write_index, rbuf_read_amount(&trxw_group_rbuf), rbuf_write_amount(&trxw_group_rbuf));
fflush(stdout);
fflush(stderr);
exit(EXIT_FAILURE);
}
#define BURST_SIZE 16
#define TX_POOL_SIZE 16
int8_t tx_data[BURST_SIZE][TX_PACKET_SIZE];
#ifdef DPDK
#include "dpdk.c"
static void send_packets(int port) {
struct rte_mbuf * pkt[TX_POOL_SIZE];
struct rte_ether_hdr *eth_hdr;
uint16_t nb_tx = 0;
for(int i = 0; i < TX_POOL_SIZE; i++) {
int pkt_size;
pkt[i] = rte_pktmbuf_alloc(tx_mbuf_pool);
eth_hdr = rte_pktmbuf_mtod(pkt[i], struct rte_ether_hdr*);
if(port) {
eth_hdr->d_addr = s_addr;
eth_hdr->s_addr = d_addr;
} else {
eth_hdr->d_addr = d_addr;
eth_hdr->s_addr = s_addr;
}
eth_hdr->ether_type = htons(0xaefe);
memcpy(rte_pktmbuf_mtod_offset(pkt[i], uint8_t *, sizeof(struct rte_ether_hdr)), tx_data[i], TX_ECPRI_PACKET_SIZE);
pkt_size = TX_PACKET_SIZE;
pkt[i]->data_len = pkt_size;
pkt[i]->pkt_len = pkt_size;
}
while(nb_tx < TX_POOL_SIZE) {
int64_t x = TX_POOL_SIZE - nb_tx;
nb_tx += rte_eth_tx_burst(port, 0, pkt + nb_tx, x > BURST_SIZE ? BURST_SIZE : x);
}
/* Free any unsent packets. */
if (nb_tx < BURST_SIZE) {
uint16_t buf;
for (buf = nb_tx; buf < BURST_SIZE; buf++)
rte_pktmbuf_free(pkt[buf]);
log_exit("SEND_THREAD", "Sent %d packets instead of %d", nb_tx, BURST_SIZE);
}
}
/* DPDK */
#else
struct rte_mbuf {
int buf_addr;
int data_off;
};
void rte_pktmbuf_free(void * pkt) {
(void) pkt;
//for(int i = 0; i < 1000; i ++)
// asm("NOP");
}
#endif
static void init_counter(volatile counter_stat_t * c) {
c->counter = 0;
c->pps_counter = 0;
c->pps_ts = 0;
c->pps = 0;
}
static void update_counter_pps(volatile counter_stat_t * c) {
struct timespec _ts;
int64_t ts;
clock_gettime(CLOCK_TAI, &_ts);
ts = ts_to_int(_ts);
if((ts - c->pps_ts) > PPS_UPDATE_PERIOD) {
if(c->pps_ts)
c->pps = ((c->counter - c->pps_counter) * NSEC_PER_SEC) / (ts - c->pps_ts);
c->pps_counter = c->counter;
c->pps_ts = ts;
}
}
static void update_counter(volatile counter_stat_t * c, int64_t v) {
c->counter += v;
}
static void trace_handler(struct timespec initial, TRXEcpriState * s) {
struct timespec next;
int ready = 1;
if(s->trace_tx)
ready &= tx_trace_ready;
if(s->trace_rx)
ready &= rx_trace_ready;
if(ready) {
int64_t d;
FILE * f;
char n[256];
int start;
uint8_t ones[14];
for(int i = 0; i < 14; i++)
ones[i] = 0xff;
clock_gettime(CLOCK_TAI, &next);
d = calcdiff_ns(next, initial);
log_info("TRACE", "Packets sent: %" PRIi64, sent_counter.counter);
log_info("TRACE", "Duration: %" PRIi64, d);
usleep(1000 * 200);
memset(n, '\0', 256);
sprintf(n, "%s/tx.trace", s->log_directory);
f = fopen(n, "wb+");
start = (s->trace_offset + encode_counter_prev) % tx_rbuf.buf_len;
log_info("TRACE", "Writing %d frames to tx.trace", (tx_rbuf.write_index + tx_rbuf.buf_len - start) % tx_rbuf.buf_len);
for(int i = start; i != tx_rbuf.write_index; i = (i + 1) % tx_rbuf.buf_len) {
fwrite(ones, 14, 1, f);
fwrite(((uint8_t*) tx_rbuf.buffer) + i * tx_rbuf.len, tx_rbuf.len, 1, f);
}
fclose(f);
memset(n, '\0', 256);
sprintf(n, "%s/trxw.trace", s->log_directory);
f = fopen(n, "wb+");
start = (s->trace_offset) % trxw_rbuf[0].buf_len;
log_info("TRACE", "Writing %d frames to trxw.trace", (trxw_rbuf[0].write_index + trxw_rbuf[0].buf_len - start) % trxw_rbuf[0].buf_len);
for(int i = start; i != trxw_rbuf[0].write_index; i = (i + 1) % trxw_rbuf[0].buf_len) {
for(int j = 0; j < s->tx_n_channel; j++)
fwrite((uint8_t *) (((Complex *) trxw_rbuf[j].buffer) + i * trxw_rbuf[0].len), trxw_rbuf[0].len * sizeof(Complex), 1, f);
}
fclose(f);
memset(n, '\0', 256);
sprintf(n, "%s/rx.trace", s->log_directory);
f = fopen(n, "wb+");
start = s->trace_offset % rx_rbuf.buf_len;
log_info("TRACE", "Writing %d frames to rx.trace", (rx_rbuf.write_index + rx_rbuf.buf_len - start) % rx_rbuf.buf_len);
for(int i = start; i != rx_rbuf.write_index; i = (i + 1) % rx_rbuf.buf_len) {
fwrite(((uint8_t*) rx_rbuf.buffer) + i * rx_rbuf.len, rx_rbuf.len, 1, f);
}
fclose(f);
memset(n, '\0', 256);
sprintf(n, "%s/trxr.trace", s->log_directory);
f = fopen(n, "wb+");
start = (s->trace_offset + decode_counter_prev) % trxr_rbuf[0].buf_len;
log_info("TRACE", "Writing %d frames to trxr.trace", (trxr_rbuf[0].write_index + trxr_rbuf[0].buf_len - start) % trxr_rbuf[0].buf_len);
for(int i = start; i != trxr_rbuf[0].write_index; i = (i + 1) % trxr_rbuf[0].buf_len) {
for(int j = 0; j < s->rx_n_channel; j++)
fwrite((uint8_t *) (((Complex *) trxr_rbuf[j].buffer) + i * trxr_rbuf[0].len), trxr_rbuf[0].len * sizeof(Complex), 1, f);
}
fclose(f);
log_exit("", "Finished tracing");
}
}
static void *recv_thread(void *p) {
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
#ifdef DPDK
int first_seq_id = 1;
#else
int64_t target_counter = 0;
struct timespec current, previous;
#endif
log_info("RECV_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->recv_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->recv_affinity);
#define RTE_MBUF_SIZE 20000
#define MIN_RX 10000
for(int64_t i = 0;; i++) {
struct rte_mbuf * pkt[RTE_MBUF_SIZE];
uint8_t * buf;
#ifdef DPDK
uint8_t * rtebuf;
int port = 0;
#endif
int nb_rx = 0;
int n;
int drop_packet = 0;
#ifdef DPDK
while(!nb_rx)
nb_rx = rte_eth_rx_burst(port, 0, pkt + nb_rx, 1024);
#else
// Limit packets sent
if(recv_counter.counter >= target_counter) {
clock_gettime(CLOCK_TAI, &current);
if(!i || calcdiff_ns(current, previous) >= (1000 * 1000 * 10)) {
target_counter += s->frame_frequency / 100;
previous = current;
}
}
if(recv_counter.counter < target_counter) {
nb_rx = 1024;
usleep(200);
}
else
continue;
#endif
if(nb_rx > RTE_MBUF_SIZE)
log_exit("RECV_THREAD", "nb_rx (%d) > RTE_MBUF_SIZE (%d)", nb_rx, RTE_MBUF_SIZE);
received_pkts = 1;
n = rbuf_write_amount(&rx_rbuf);
drop_packet = nb_rx > n;
if(drop_packet) {
for(int i = 0; i < nb_rx; i++)
rte_pktmbuf_free(pkt[i]);
if(nb_rx)
update_counter(&rx_drop_counter, nb_rx);
}
else {
int nc; int nr; int k = 0;
nr = nb_rx;
while((nc = rbuf_contiguous_copy(NULL, &rx_rbuf, nr))) {
if(s->trace_rx) {
if((recv_counter.counter + nc) >= (rx_rbuf.buf_len + s->trace_offset)) {
rx_trace_ready = 1;
log_info("RECV_THREAD", "RX Trace ready");
pthread_exit(EXIT_SUCCESS);
} else if (rx_trace_ready) {
pthread_exit(EXIT_SUCCESS);
}
}
buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.write_index * rx_rbuf.len);
for(int i = 0; i < nc; i++) {
#ifdef DPDK
rtebuf = (uint8_t *) (pkt[i + k])->buf_addr + (pkt[i + k])->data_off;
if(first_seq_id) {
uint16_t seq_id = htons(((uint16_t *) (rtebuf + 20))[0]);
printf("seq_id = %d\n", seq_id);
first_seq_id = 0;
}
memcpy(buf + i * rx_rbuf.len, rtebuf, rx_rbuf.len);
#else
//memcpy(buf + i * rx_rbuf.len, pkt_frame_full, rx_rbuf.len);
#endif
}
rx_rbuf.write_index = (rx_rbuf.write_index + nc) % rx_rbuf.buf_len;
for(int i = 0; i < nc; i++)
rte_pktmbuf_free(pkt[i + k]);
nr -= nc;
k += nc;
}
}
update_counter(&recv_counter, nb_rx);
}
pthread_exit(EXIT_SUCCESS);
}
// Send as soon as packets are encoded
// Signal to encode thread that packets has been sent
static void *send_thread(void *p) {
cpu_set_t mask;
struct timespec initial;
TRXEcpriState * s = (TRXEcpriState *) p;
log_info("SEND_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->send_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->send_affinity);
clock_gettime(CLOCK_TAI, &initial);
for(int64_t i = 1;; i++) {
int64_t n = rbuf_read_amount(&tx_rbuf);
if(n >= BURST_SIZE) {
int nb_burst = n / BURST_SIZE;
for(int j = 0; j < nb_burst; j++) {
for(int k = 0; k < BURST_SIZE; k++) {
memcpy(tx_data[k], RBUF_READ0(tx_rbuf, uint8_t), tx_rbuf.len);
rbuf_update_read_index(&tx_rbuf);
}
#ifdef DPDK
send_packets(0);
#else
for(int i = 0; i < 3000; i++)
asm("NOP");
#endif
}
update_counter(&sent_counter, nb_burst * BURST_SIZE);
}
}
pthread_exit(EXIT_SUCCESS);
}
/*
If sync has happenned (=we have received frames):
Prepare as soon as TRX has packet to write
Signal
Else:
Prepare as soon as there is space in tx buffer
*/
#define TX_SYNC_BURST_SIZE 512
static void *encode_thread(void *p) {
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
int64_t target_counter = 0;
struct timespec next;
int reset_encode_counter = 1;
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->encode_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->encode_affinity);
for(int64_t i = 0;; i++) {
int n;
n = rbuf_write_amount(&tx_rbuf);
// Send empty frames until we receive something
if(s->start_sending) {
if(!sync_complete) {
if(i == 0)
clock_gettime(CLOCK_TAI, &next);
// Limit packets sent
if(encode_counter.counter > target_counter) {
int k = (encode_counter.counter - target_counter + (s->frame_frequency / 100) - 1) / (s->frame_frequency / 100);
add_ns(&next, k * 1000 * 1000 * 10); // 10ms to send 38400 packets
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
target_counter += k * s->frame_frequency / 100;
}
n = (n > TX_SYNC_BURST_SIZE) ? n : TX_SYNC_BURST_SIZE;
n = (n < (s->frame_frequency / 100)) ? n : (s->frame_frequency / 100);
for(int j = 0; j < n; j++) {
*((uint16_t *) (RBUF_WRITE0(tx_rbuf, uint8_t) + 6)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
}
update_counter(&encode_counter, n);
}
else if (reset_encode_counter) {
if(s->trace_tx)
encode_counter_prev = encode_counter.counter;
encode_counter.counter = 0;
reset_encode_counter = 0;
seq_id = 0;
}
}
// If we have frames to encode (is there space in TX buffer)
// If there are frames from trx_write callback to encode
if(n && rbuf_read_amount(&trxw_rbuf[0]) && rbuf_read_amount(&trxw_group_rbuf)) {
sample_group_t * g; int nb_frames;
g = RBUF_READ0(trxw_group_rbuf, sample_group_t);
if(g->wait) {
g->wait = 0;
g->count -= encode_counter.counter;
g->zeroes = 1;
}
nb_frames = g->count > n ? n : g->count;
g->count -= nb_frames;
if(s->trace_tx) {
if(sync_complete && (encode_counter.counter + nb_frames) >= (tx_rbuf.buf_len + s->trace_offset)) {
tx_trace_ready = 1;
log_info("ENCODE_THREAD", "TX Trace ready");
pthread_exit(EXIT_SUCCESS);
} else if (tx_trace_ready) {
pthread_exit(EXIT_SUCCESS);
}
}
if(g->zeroes) {
for(int j = 0; j < nb_frames; j++) {
memset(RBUF_WRITE0(tx_rbuf, uint8_t) + 8, 0x00, 240);
*((uint16_t *) (RBUF_WRITE0(tx_rbuf, uint8_t) + 6)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
}
trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + nb_frames) % trxw_rbuf[0].buf_len;
} else {
int nc;
int nf = nb_frames;
while((nc = rbuf_contiguous_copy(&trxw_rbuf[0], &tx_rbuf, nf))) {
Complex * iq_samples[4];
uint8_t * buf = RBUF_WRITE0(tx_rbuf, uint8_t) + 8;
for(int j = 0; j < s->tx_n_channel; j++)
iq_samples[j] = ((Complex *) trxw_rbuf[j].buffer) + (trxw_rbuf[0].read_index * trxw_rbuf[0].len);
for(int i = 0; i < nc; i++) {
for(int i = 0; i < s->tx_n_channel ; i++)
encode_s64_b60_2(buf + i * 60, (float *) iq_samples[i]);
*((uint16_t *)(buf - 2)) = htons(seq_id++);
for(int j = 0; j < s->tx_n_channel; j++)
iq_samples[j] += trxw_rbuf[0].len;
buf += tx_rbuf.len;
}
tx_rbuf.write_index = (tx_rbuf.write_index + nc) % tx_rbuf.buf_len;
trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + nc) % trxw_rbuf[0].buf_len;
nf -= nc;
}
if(nf)
exit(EXIT_FAILURE);
}
update_counter(&encode_counter, nb_frames);
if(!g->count) {
rbuf_update_read_index(&trxw_group_rbuf);
}
}
}
pthread_exit(EXIT_SUCCESS);
}
static void *decode_thread(void *p) {
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
struct timespec next;
int64_t target_counter = 0;
int reset_decode_counter = 1;
log_info("DECODE_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->decode_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->decode_affinity);
for(int64_t i = 0;; i++) {
int n, nc;
if(s->start_receiving) {
if(!received_pkts) {
if(i == 0)
clock_gettime(CLOCK_TAI, &next);
// Limit packets sent
if(decode_counter.counter > target_counter) {
int k = (decode_counter.counter - target_counter + (s->frame_frequency / 100) - 1) / (s->frame_frequency / 100);
add_ns(&next, k * 1000 * 1000 * 10); // 10ms to send 38400 packets
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
target_counter += k * (s->frame_frequency / 100);
}
n = (s->frame_frequency / 100);
for(int j = 0; j < n; j++)
rbuf_update_write_index(&trxr_rbuf[0]);
update_counter(&decode_counter, n);
continue;
}
else if (reset_decode_counter) {
if(s->trace_rx)
decode_counter_prev = decode_counter.counter;
decode_counter.counter = 0;
reset_decode_counter = 0;
}
}
while(!(n = rbuf_read_amount(&rx_rbuf))) {};
while(rbuf_write_amount(&trxr_rbuf[0]) < n) {};
while((nc = rbuf_contiguous_copy(&rx_rbuf, &trxr_rbuf[0], n))) {
uint8_t * buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.read_index * rx_rbuf.len) + 22;
if(s->trace_rx) {
if(received_pkts && ((decode_counter.counter + nc) >= (trxr_rbuf[0].buf_len + s->trace_offset))) {
rx_trace_ready = 1;
log_info("DECODE_THREAD", "RX Trace ready");
pthread_exit(EXIT_SUCCESS);
} else if (rx_trace_ready) {
pthread_exit(EXIT_SUCCESS);
}
}
Complex * iq_samples[4];
for(int i = 0; i < s->rx_n_channel; i++)
iq_samples[i] = (((Complex *) trxr_rbuf[i].buffer) + (trxr_rbuf[0].write_index * trxr_rbuf[0].len));
for(int i = 0; i < nc; i++) {
for(int j = 0; j < s->rx_n_channel ; j++) {
decode_s64_b60_2((float *) (iq_samples[j] + i * 32), buf + j * 60 + i * rx_rbuf.len);
}
}
trxr_rbuf[0].write_index = (trxr_rbuf[0].write_index + nc) % trxr_rbuf[0].buf_len;
rx_rbuf.read_index = (rx_rbuf.read_index + nc) % rx_rbuf.buf_len;
n -= nc;
update_counter(&decode_counter, nc);
}
}
pthread_exit(EXIT_SUCCESS);
}
static void *statistic_thread(void *p) {
struct timespec next, initial;
int64_t recv_stop = 0;
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
FILE * stats_file_desc;
log_info("STATISTIC_THREAD", "Thread init");
char stats_file_name[256];
memset(stats_file_name, '\0', 256);
sprintf(stats_file_name, "%s/ecpri.stats", s->log_directory);
stats_file_desc = fopen(stats_file_name, "w+");
if(!stats_file_desc)
error(EXIT_FAILURE, errno, "Couldn't open %s\n", stats_file_name);
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->statistic_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->statistic_affinity);
clock_gettime(CLOCK_TAI, &initial);
next = initial;
for(int64_t i = 0;; i++) {
add_ns(&next, s->statistics_refresh_rate_ns);
if(s->trace_rx || s->trace_tx)
trace_handler(initial, s);
print_stats(stats_file_desc, (i % 50) == 0);
#ifdef DEBUG
fprintf(stats_file_desc,
"%d %d %d %d %d %d %d %d\n",
rx_rbuf.write_index,
rx_rbuf.read_index,
trxr_rbuf[0].write_index,
trxr_rbuf[0].read_index,
trxw_rbuf[0].write_index,
trxw_rbuf[0].read_index,
tx_rbuf.write_index,
tx_rbuf.read_index);
fprintf(stats_file_desc, "TRXW RBUF: ri %d wi %d ra %d wa %d\n", trxw_rbuf[0].read_index, trxw_rbuf[0].write_index, rbuf_read_amount(&trxw_rbuf[0]), rbuf_write_amount(&trxw_rbuf[0]));
#endif
fflush(stats_file_desc);
update_counter_pps(&rx_drop_counter);
update_counter_pps(&tx_drop_counter);
update_counter_pps(&recv_counter);
update_counter_pps(&decode_counter);
update_counter_pps(&read_counter);
update_counter_pps(&write_counter);
update_counter_pps(&encode_counter);
update_counter_pps(&sent_counter);
if(s->monitor_pps) {
if(recv_counter.pps > recv_pps_threshold) {
recv_pps_threshold_hit = 1;
}
if(recv_pps_threshold_hit && recv_counter.pps < recv_pps_threshold) {
struct timespec _ts;
int64_t ts;
clock_gettime(CLOCK_MONOTONIC, &_ts);
ts = ts_to_int(_ts);
if((recv_stop && ((ts - recv_stop) > (s->monitor_trigger_duration) * INT64_C(1000000000)))) {
if(s->monitor_pps == 1)
log_exit("MONITOR", "Stopped recieving packets, restarting...");
log_info("MONITOR", "Stopped recieving packets, sending again...");
sync_complete = 0;
recv_stop = 0;
}
if(!recv_stop)
recv_stop = ts;
}
}
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
}
pthread_exit(EXIT_SUCCESS);
}
static int start_threads(TRXEcpriState * s) {
pthread_t recv_pthread;
pthread_t send_pthread;
pthread_t encode_pthread;
pthread_t decode_pthread;
pthread_t statistic_pthread;
struct sched_param recv_param;
struct sched_param send_param;
struct sched_param encode_param;
struct sched_param decode_param;
struct sched_param statistic_param;
pthread_attr_t recv_attr;
pthread_attr_t send_attr;
pthread_attr_t encode_attr;
pthread_attr_t decode_attr;
pthread_attr_t statistic_attr;
log_info("TRX_ECPRI", "Starting threads");
// Initialize pthread attributes (default values)
if (pthread_attr_init(&recv_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
// Set a specific stack size
if (pthread_attr_setstacksize(&recv_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
// Set scheduler policy and priority of pthread
if (pthread_attr_setschedpolicy(&recv_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
recv_param.sched_priority = 97;
if (pthread_attr_setschedparam(&recv_attr, &recv_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
/* Use scheduling parameters of attr */
if (pthread_attr_setinheritsched(&recv_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&send_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&send_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&send_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
send_param.sched_priority = 97;
if (pthread_attr_setschedparam(&send_attr, &send_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&send_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&encode_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&encode_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&encode_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
encode_param.sched_priority = 97;
if (pthread_attr_setschedparam(&encode_attr, &encode_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&encode_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&decode_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&decode_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&decode_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
decode_param.sched_priority = 97;
if (pthread_attr_setschedparam(&decode_attr, &decode_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&decode_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&statistic_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&statistic_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&statistic_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
statistic_param.sched_priority = 97;
if (pthread_attr_setschedparam(&statistic_attr, &statistic_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&statistic_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_create(&statistic_pthread, NULL, statistic_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create statistic thread");
usleep(1000 * 20);
if (pthread_create(&encode_pthread, NULL, encode_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create encode thread");
usleep(1000 * 20);
if (pthread_create(&decode_pthread, NULL, decode_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create decode thread");
usleep(1000 * 20);
if (pthread_create(&send_pthread, NULL, send_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create send thread");
usleep(1000 * 500);
if (pthread_create(&recv_pthread, NULL, recv_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create recv thread");
return 0;
}
int startdpdk(TRXEcpriState * s) {
uint8_t ecpri_message[TX_ECPRI_PACKET_SIZE];
int argc = 1;
int k = 1;
int prev_space = -1;
char ** argv;
#ifndef DPDK
for(int i = 0; i < 262; i++)
pkt_frame_full[i] = 0xff;
#endif
for(int i = 0;; i++) {
if(s->dpdk_options[i] == ' ')
argc++;
else if(s->dpdk_options[i] == '\0')
break;
}
argv = (char **) malloc(sizeof(char *) * argc);
for(int i = 0;; i++) {
if(s->dpdk_options[i] == ' ') {
argv[k] = (char *) malloc(i - prev_space);
strncpy(argv[k], s->dpdk_options + prev_space + 1, i - prev_space -1);
argv[k][i - prev_space-1] = '\0';
prev_space = i;
k++;
}
else if(s->dpdk_options[i] == '\0') {
break;
}
}
argv[0] = "";
#ifdef DPDK
init_dpdk(argc, argv);
#endif
log_info("TRX_ECPRI", "Start");
//set_latency_target();
seq_id = 0;
init_counter(&rx_drop_counter);
init_counter(&tx_drop_counter);
init_counter(&recv_counter);
init_counter(&decode_counter);
init_counter(&read_counter);
init_counter(&write_counter);
init_counter(&encode_counter);
init_counter(&sent_counter);
RBUF_INIT(rx_rbuf, "RX ring buffer", s->txrx_buf_size, RX_MAX_PACKET_SIZE, uint8_t);
RBUF_INIT(tx_rbuf, "TX ring buffer", s->txrx_buf_size, TX_ECPRI_PACKET_SIZE, uint8_t);
for(int i = 0; i < s->tx_n_channel; i++) {
char name[256];
sprintf(name, "TRXWrite Ring Buffer %d", i);
RBUF_INIT(trxw_rbuf[i], name, s->trx_buf_size, N_SAMPLES, Complex);
}
for(int i = 0; i < s->rx_n_channel; i++) {
char name[256];
sprintf(name, "TRXRead Ring Buffer %d", i);
RBUF_INIT(trxr_rbuf[i], name, s->trx_buf_size, N_SAMPLES, Complex);
}
RBUF_INIT(trxw_group_rbuf, "TRXGroupWrite ring buffer", TRX_MAX_GROUP, 1, sample_group_t);
memset((uint8_t *) ecpri_message, 0, TX_ECPRI_PACKET_SIZE);
#ifdef DPDK
if(sscanf((char *) s->re_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c",
&d_addr.addr_bytes[0],
&d_addr.addr_bytes[1],
&d_addr.addr_bytes[2],
&d_addr.addr_bytes[3],
&d_addr.addr_bytes[4],
&d_addr.addr_bytes[5]) != 6)
fprintf(stderr, "Invalid eRE MAC address\n");
if(sscanf((char *) s->rec_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c",
&s_addr.addr_bytes[0],
&s_addr.addr_bytes[1],
&s_addr.addr_bytes[2],
&s_addr.addr_bytes[3],
&s_addr.addr_bytes[4],
&s_addr.addr_bytes[5]) != 6)
fprintf(stderr, "Invalid eREC MAC address\n");
#endif
/* Standard Header */
ecpri_message[0] = 0x10; // Protocol data revision 0x1, C = 0
// Message type = 0x00, IQ data
// Payload size
*((uint16_t *) (ecpri_message + 2)) = htons(244);
*((uint16_t *) (ecpri_message + 4)) = htons(s->flow_id);
for(int i = 0; i < tx_rbuf.buf_len; i++)
memcpy(((uint8_t *) tx_rbuf.buffer) + (i * tx_rbuf.len), ecpri_message, tx_rbuf.len);
start_threads(s);
return 0;
}
static void trx_ecpri_end(TRXState *s1)
{
log_info("TRX_ECPRI", "End");
TRXEcpriState *s = s1->opaque;
free(s);
}
static int64_t prev_ts = 0;
static int64_t prev_count = 0;
#define M 32
static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void **__samples, int count, int tx_port_index, TRXWriteMetadata *md)
{
int write_count; int64_t ts; sample_group_t * g; int nc; int nk = 0;
float ** _samples = (float **) __samples;
TRXEcpriState *s = s1->opaque;
write_count = count / M;
ts = timestamp / M;
log_debug("TRX_ECPRI_WRITE", "trx_ecpri_write, count = %ld", count);
if(prev_count && ((ts - prev_ts) != prev_count)) {
log_exit("TRX_ECPRI_WRITE", "Gap between timestamps: prev_ts %li ts %li prev_count %li count %li diff_ts %li", prev_ts, ts, prev_count, count, (ts - prev_ts));
}
prev_ts = ts; prev_count = write_count;
if(write_count > rbuf_write_amount(&trxw_rbuf[0])) {
//log_exit("TRX_ECPRI_WRITE", "Not enough space to write in trxw_rbuf (write count = %d)", write_count);
update_counter(&tx_drop_counter, write_count);
return;
}
if(s->trace_tx) {
if((write_counter.counter + write_count) >= (trxw_rbuf[0].buf_len + s->trace_offset)) {
tx_trace_ready = 1;
log_info("TRX_ECPRI_WRITE", "TX Trace ready");
pthread_exit(EXIT_SUCCESS);
} else if (tx_trace_ready) {
pthread_exit(EXIT_SUCCESS);
}
}
if(first_trx_write) {
sample_group_t * g2 = RBUF_WRITE0(trxw_group_rbuf, sample_group_t);
g2->count = ts;
g2->wait = 1;
g2->zeroes = 1;
rbuf_update_write_index(&trxw_group_rbuf);
}
g = RBUF_WRITE0(trxw_group_rbuf, sample_group_t);
g->zeroes = __samples ? 0 : 1;
g->wait = 0;
g->count = write_count;
while((nc = rbuf_contiguous_copy(NULL, &trxw_rbuf[0], write_count))) {
int len = nc * trxr_rbuf[0].len * sizeof(Complex);
if(__samples)
for(int i = 0; i < s->tx_n_channel; i++) {
uint8_t * src = ((uint8_t*) _samples[i]) + (nk * trxr_rbuf[0].len * sizeof(Complex));
uint8_t * dst = ((uint8_t *) trxw_rbuf[i].buffer) + trxw_rbuf[0].write_index * trxw_rbuf[0].len * sizeof(Complex);
memcpy(dst, src, len);
}
trxw_rbuf[0].write_index = (trxw_rbuf[0].write_index + nc) % trxw_rbuf[0].buf_len;
write_count -= nc;
nk += nc;
}
rbuf_update_write_index(&trxw_group_rbuf);
update_counter(&write_counter, count / M);
}
static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__samples, int count, int rx_port_index, TRXReadMetadata *md)
{
int nc; int n;
float ** _samples = (float **) __samples;
int read_count = (count / M);
int offset = 0;
TRXEcpriState *s = s1->opaque;
log_debug("TRX_ECPRI_READ", "count = %ld (%li)", read_count, read_counter.counter);
while(rbuf_read_amount(&trxr_rbuf[0]) < read_count);
sync_complete = 1;
n = read_count;
while((nc = rbuf_contiguous_copy(&trxr_rbuf[0], NULL, n))) {
int len = nc * trxr_rbuf[0].len * sizeof(Complex);
for(int i = 0; i < s->rx_n_channel; i++ ) {
uint8_t * dst = (uint8_t*) (_samples[i] + offset);
uint8_t * src = ((uint8_t *) trxr_rbuf[i].buffer) + trxr_rbuf[0].read_index * trxr_rbuf[0].len * sizeof(Complex);
memcpy(dst, src, len);
}
trxr_rbuf[0].read_index = (trxr_rbuf[0].read_index + nc) % trxr_rbuf[0].buf_len;
n -= nc;
offset += len;
}
*ptimestamp = (read_counter.counter) * M;
update_counter(&read_counter, read_count);
return count;
}
/* This function can be used to automatically set the sample
rate. Here we don't implement it, so the user has to force a given
sample rate with the "sample_rate" configuration option */
static int trx_ecpri_get_sample_rate(TRXState *s1, TRXFraction *psample_rate,
int *psample_rate_num, int sample_rate_min)
{
return -1;
}
static int trx_ecpri_start(TRXState *s1, const TRXDriverParams *params)
{
TRXEcpriState *s = s1->opaque;
log_info("TRX_ECPRI_START", "Start");
log_info("TRX_ECPRI_START", "trx_api_version: %d", s1->trx_api_version);
log_info("TRX_ECPRI_START", "config file: %s", s1->path);
log_info("TEST-DPDK-ECPRI", "rec-mac: %s, re-mac: %s, rec-if: %s", s->rec_mac, s->re_mac, s->rec_if);
s->sample_rate = params->sample_rate[0].num / params->sample_rate[0].den;
startdpdk(s);
return 0;
}
void dummy_enb_init(TRXState *s1, TRXEcpriState *s) {
s1->trx_write_func2 = trx_ecpri_write;
s1->trx_read_func2 = trx_ecpri_read;
startdpdk(s);
}
/* Called to start the tranceiver. Return 0 if OK, < 0 if */
int trx_start_func(TRXState *s, const TRXDriverParams *p) {
log_info("DEBUG", "trx_start_func");
return 0;
}
/* Deprecated, use trx_write_func2 instead.
Write 'count' samples on each channel of the TX port
'tx_port_index'. samples[0] is the array for the first
channel. timestamp is the time (in samples) at which the first
sample must be sent. When the TRX_WRITE_FLAG_PADDING flag is
set, samples is set to NULL. It indicates that no data should
be sent (TDD receive time). TRX_WRITE_FLAG_END_OF_BURST is set
to indicate in advance that the next write call will have the
TRX_WRITE_FLAG_PADDING flag set. Note:
TRX_WRITE_FLAG_END_OF_BURST and TRX_WRITE_FLAG_PADDING are
never set simultaneously.
*/
void trx_write_func(TRXState *s, trx_timestamp_t timestamp, const void **samples, int count, int flags, int tx_port_index) {
log_info("DEBUG", "**");
}
/* Deprecated, use trx_read_func2 instead.
Read 'count' samples from each channel. samples[0] is the array
for the first channel. *ptimestamp is the time at which the
first samples was received. Return the number of sample read
(=count).
Note: It is explicitely allowed that the application calls
trx_write_func, trx_read_func, trx_set_tx_gain_func and
trx_set_rx_gain_func from different threads.
*/
int trx_read_func(TRXState *s, trx_timestamp_t *ptimestamp, void **samples, int count, int rx_port_index) {
log_info("DEBUG", "**");
return 0;
}
/* Dynamic set the transmit gain (in dB). The origin and range are
driver dependent.
Note: this function is only used for user supplied dynamic
adjustements.
*/
void trx_set_tx_gain_func(TRXState *s, double gain, int channel_num) {
log_info("DEBUG", "trx_set_tx_gain_func");
}
/* Dynamic set the receive gain (in dB). The origin and range are
driver dependent.
Note: this function is only used for user supplied dynamic
adjustements.
*/
void trx_set_rx_gain_func(TRXState *s, double gain, int channel_num) {
log_info("DEBUG", "trx_set_rx_gain_func");
}
/* Return the maximum number of samples per TX packet. Called by
* the application after trx_start_func.
* Optional
*/
int trx_get_tx_samples_per_packet_func(TRXState *s) {
log_info("DEBUG", "trx_get_tx_samples_per_packet_func");
return 0;
}
/* Return some statistics. Return 0 if OK, < 0 if not available. */
int trx_get_stats(TRXState *s, TRXStatistics *m) {
log_info("DEBUG", "trx_get_stats");
return 0;
}
/* Callback must allocate info buffer that will be displayed */
void trx_dump_info(TRXState *s, trx_printf_cb cb, void *opaque) {
log_info("DEBUG", "*opaque");
}
/* Return the absolute TX power in dBm for the TX channel
'channel_num' assuming a square signal of maximum
amplitude. This function can be called from any thread and
needs to be fast. Return 0 if OK, -1 if the result is not
available. */
int trx_get_abs_tx_power_func(TRXState *s,
float *presult, int channel_num) {
log_info("DEBUG", "trx_get_abs_tx_power_func");
return 0;
}
/* Return the absolute RX power in dBm for the RX channel
'channel_num' assuming a square signal of maximum
amplitude. This function can be called from any thread and
needs to be fast. Return 0 if OK, -1 if the result is not
available. */
int trx_get_abs_rx_power_func(TRXState *s,
float *presult, int channel_num) {
log_info("DEBUG", "trx_get_abs_rx_power_func");
return 0;
}
/* Remote API communication
* Available since API v14
* trx_msg_recv_func: called for each trx received messages
* trx_msg_send_func: call it to send trx messages (They must be registered by client)
* For each message, a call to send API must be done
*/
void trx_msg_recv_func(TRXState *s, TRXMsg *msg) {
log_info("DEBUG", "trx_msg_recv_func");
}
TRXMsg* trx_msg_send_func(TRXState *s) {
log_info("DEBUG", "trx_msg_send_func");
return NULL;
}
/* Return actual transmit gain (in dB). The origin and range are
driver dependent.
*/
void trx_get_tx_gain_func(TRXState *s, double *gain, int channel_num) {
log_info("DEBUG", "trx_get_tx_gain_func");
}
/* Returns actual receive gain (in dB). The origin and range are
driver dependent.
*/
void trx_get_rx_gain_func(TRXState *s, double *gain, int channel_num) {
log_info("DEBUG", "trx_get_rx_gain_func");
}
/* Stop operation of the transceiver - to be called after trx_start.
resources allocated in init are not released, so trx_call can be called again */
void trx_stop_func(TRXState *s) {
log_info("DEBUG", "trx_stop_func");
}
/* OFDM mode: experimental 7.2 API */
/* read the current timestamp (only used in OFDM mode). Return 0
if OK, < 0 if not supported by device. */
int trx_read_timestamp(TRXState *s, trx_timestamp_t *ptimestamp,
int port_index) {
log_info("DEBUG", "trx_read_timestamp");
return 0;
}
void trx_set_tx_streams(TRXState *s, int rf_port_index,
int n_streams, const TRXOFDMStreamInfo *streams) {
log_info("DEBUG", "trx_set_tx_streams");
}
void trx_set_rx_streams(TRXState *s, int rf_port_index,
int n_streams, const TRXOFDMStreamInfo *streams) {
log_info("DEBUG", "trx_set_rx_streams");
}
/* schedule the reading of OFDM symbols. Return 0 if OK, < 0 if
error. */
int trx_schedule_read(TRXState *s, int rf_port_index,
const TRXScheduledSymbol *symbols,
int n_symbols) {
log_info("DEBUG", "trx_schedule_read");
return 0;
}
/* AGC functions */
int trx_set_agc_func(TRXState *s, const TRXAGCParams *p, int channel) {
log_info("DEBUG", "trx_set_agc_func");
return 0;
}
int trx_get_agc_func(TRXState *s, TRXAGCParams *p, int channel) {
log_info("DEBUG", "trx_get_agc_func");
return 0;
}
int trx_driver_init(TRXState *s1)
{
TRXEcpriState *s;
double val;
// Lock all current and future pages from preventing of being paged to swap
if (mlockall(MCL_CURRENT | MCL_FUTURE)) {
log_error("TRX_ECPRI", "mlockall failed");
}
log_info("TRX_ECPRI", "Init");
if (s1->trx_api_version != TRX_API_VERSION) {
fprintf(stderr, "ABI compatibility mismatch between LTEENB and TRX driver (LTEENB ABI version=%d, TRX driver ABI version=%d)\n",
s1->trx_api_version, TRX_API_VERSION);
return -1;
}
s = malloc(sizeof(TRXEcpriState));
memset(s, 0, sizeof(*s));
trx_get_param_double(s1, &val, "recv_affinity");
s->recv_affinity = (int) val;
trx_get_param_double(s1, &val, "send_affinity");
s->send_affinity = (int) val;
trx_get_param_double(s1, &val, "encode_affinity");
s->encode_affinity = (int) val;
trx_get_param_double(s1, &val, "decode_affinity");
s->decode_affinity = (int) val;
trx_get_param_double(s1, &val, "statistic_affinity");
s->statistic_affinity = (int) val;
trx_get_param_double(s1, &val, "flow_id");
s->flow_id = (int) val;
trx_get_param_double(s1, &val, "frame_frequency");
s->frame_frequency = (int) val;
trx_get_param_double(s1, &val, "trx_buf_size");
s->trx_buf_size = (int) val;
trx_get_param_double(s1, &val, "txrx_buf_size");
s->txrx_buf_size = (int) val;
trx_get_param_double(s1, &val, "trace_rx");
s->trace_rx = (int) val;
trx_get_param_double(s1, &val, "trace_tx");
s->trace_tx = (int) val;
trx_get_param_double(s1, &val, "trace_offset");
s->trace_offset = (int) val;
trx_get_param_double(s1, &val, "monitor_pps");
s->monitor_pps = (int) val;
trx_get_param_double(s1, &val, "monitor_trigger_duration");
s->monitor_trigger_duration = (int) val;
trx_get_param_double(s1, &val, "start_sending");
s->start_sending = (int) val;
trx_get_param_double(s1, &val, "start_receiving");
s->start_receiving = (int) val;
trx_get_param_double(s1, &val, "rx_n_channel");
s->rx_n_channel = (int) val;
trx_get_param_double(s1, &val, "tx_n_channel");
s->tx_n_channel = (int) val;
trx_get_param_double(s1, &val, "statistics_refresh_rate_ns");
s->statistics_refresh_rate_ns = (int) val;
trx_get_param_double(s1, &val, "ecpri_period");
s->ecpri_period = (int) val;
if(s->ecpri_period == 0)
log_exit("TRX_ECPRI", "ecpri_period parameter can't be null\n");
if(s->rx_n_channel == 0)
log_exit("TRX_ECPRI", "rx_n_channel parameter can't be null\n");
if(s->tx_n_channel == 0)
log_exit("TRX_ECPRI", "tx_n_channel parameter can't be null\n");
s->re_mac = (uint8_t *) trx_get_param_string(s1, "re_mac");
s->rec_mac = (uint8_t *) trx_get_param_string(s1, "rec_mac");
s->rec_if = (uint8_t *) trx_get_param_string(s1, "rec_if");
s->dpdk_options = trx_get_param_string(s1, "dpdk_options");
s->log_directory = (uint8_t *) trx_get_param_string(s1, "log_directory");
recv_pps_threshold = (s->frame_frequency * 9 / 10);
s1->opaque = s;
s1->trx_end_func = trx_ecpri_end;
s1->trx_write_func2 = trx_ecpri_write;
s1->trx_read_func2 = trx_ecpri_read;
s1->trx_start_func = trx_ecpri_start;
s1->trx_get_sample_rate_func = trx_ecpri_get_sample_rate;
//s1->trx_write_func = trx_write_func;
//s1->trx_read_func = trx_read_func;
//s1->trx_set_tx_gain_func = trx_set_tx_gain_func;
//s1->trx_set_rx_gain_func = trx_set_rx_gain_func;
//s1->trx_get_tx_samples_per_packet_func = trx_get_tx_samples_per_packet_func;
//s1->trx_get_stats = trx_get_stats;
//s1->trx_dump_info = trx_dump_info;
//s1->trx_get_abs_tx_power_func = trx_get_abs_tx_power_func;
//s1->trx_get_abs_rx_power_func = trx_get_abs_rx_power_func;
//s1->trx_msg_recv_func = trx_msg_recv_func;
//s1->trx_get_tx_gain_func = trx_get_tx_gain_func;
//s1->trx_get_rx_gain_func = trx_get_rx_gain_func;
//s1->trx_stop_func = trx_stop_func;
//s1->trx_read_timestamp = trx_read_timestamp;
//s1->trx_set_tx_streams = trx_set_tx_streams;
//s1->trx_set_rx_streams = trx_set_rx_streams;
//s1->trx_schedule_read = trx_schedule_read;
//s1->trx_set_agc_func = trx_set_agc_func;
//s1->trx_get_agc_func = trx_get_agc_func;
//s1->trx_msg_send_func = trx_msg_send_func;
return 0;
}
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <error.h>
#include <fcntl.h>
#include <getopt.h>
#include <immintrin.h>
#include <inttypes.h>
#include <limits.h>
#include <linux/if_packet.h>
#include <math.h>
#include <netdb.h>
#include <netinet/ether.h>
#include <netinet/in.h>
#include <net/if.h>
#include <pthread.h>
#include <sched.h>
#include <semaphore.h>
#include <signal.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include "trx_driver.h"
#define DEBUG
#define SSE4 /* define if CPU supports SSE4.1 */
#include "private.c"
/* eCPRI Send and Recv */
#define PACKET_SIZE 262
#define FRAME_FREQ INT64_C(3840000)
#define SEND_LIMIT 1250
#define TRX_WB_MAX_PARTS 1000
#define TRX_BUF_MAX_SIZE 1000
static void log_error(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " ERROR [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
puts(line);
exit(EXIT_FAILURE);
}
static void log_info(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " INFO [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
puts(line);
}
#ifdef DEBUG
static void log_debug(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " DEBUG [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
puts(line);
}
#else
#define log_debug(...)
#endif
static int latency_target_fd = -1;
static int32_t latency_target_value = 0;
/* Latency trick
* if the file /dev/cpu_dma_latency exists,
* open it and write a zero into it. This will tell
* the power management system not to transition to
* a high cstate (in fact, the system acts like idle=poll)
* When the fd to /dev/cpu_dma_latency is closed, the behavior
* goes back to the system default.
*
* Documentation/power/pm_qos_interface.txt
*/
void set_latency_target(void) {
struct stat s;
int err;
errno = 0;
err = stat("/dev/cpu_dma_latency", &s);
if (err == -1) {
error(EXIT_FAILURE, errno, "WARN: stat /dev/cpu_dma_latency failed");
return;
}
errno = 0;
latency_target_fd = open("/dev/cpu_dma_latency", O_RDWR);
if (latency_target_fd == -1) {
error(EXIT_FAILURE, errno, "WARN: open /dev/cpu_dma_latency");
return;
}
errno = 0;
err = write(latency_target_fd, &latency_target_value, 4);
if (err < 1) {
error(EXIT_FAILURE, errno, "# error setting cpu_dma_latency to %d!",
latency_target_value);
close(latency_target_fd);
return;
}
printf("# /dev/cpu_dma_latency set to %dus\n", latency_target_value);
}
typedef struct {
volatile void * buffer;
char name[64];
size_t buf_len;
size_t len;
volatile int write_index;
volatile int read_index;
} ring_buffer_t;
typedef struct {
const char * re_mac;
const char * rec_mac;
const char * rec_if;
int recv_affinity;
int send_affinity;
int prepare_affinity;
int decompress_affinity;
int ecpri_period;
int flow_id;
int sample_rate;
} TRXEcpriState;
// Buffers
static ring_buffer_t rx_rbuf;
static ring_buffer_t trx_read_rbuf;
static ring_buffer_t tx_rbuf;
static ring_buffer_t trx_write_rbuf;
static volatile int trx_wb_part[TRX_WB_MAX_PARTS]; // TODO write next index instead of current
static volatile int64_t trx_wb_ts[TRX_WB_MAX_PARTS];
static int trx_wb_part_read_index;
static int trx_wb_part_write_index;
// Locks
pthread_mutex_t tx_mutex;
pthread_cond_t tx_cond;
pthread_mutex_t rx_mutex;
pthread_cond_t rx_cond;
pthread_mutex_t tx_ready_mutex;
pthread_cond_t tx_ready_cond;
sem_t trx_read_sem;
// Counters
static volatile int64_t prepared_frame_count;
static volatile int64_t read_frame_count;
static volatile int64_t sent_frame_count;
// Computed values
static int rxtx_buf_size;
static int ecpri_period_mult;
// Network
static volatile int seq_id;
static int send_sockfd;
static int recv_sockfd;
static struct sockaddr_ll connect_sk_addr;
// Timestamps utils
#define NSEC_PER_SEC INT64_C(1000000000)
static struct timespec int_to_ts(int64_t t) {
struct timespec ts;
ts.tv_sec = t / NSEC_PER_SEC;
ts.tv_nsec = t - (ts.tv_sec * NSEC_PER_SEC);
return ts;
}
static int64_t ts_to_int(struct timespec ts) {
return ts.tv_sec * NSEC_PER_SEC + ts.tv_nsec;
}
static void add_ns(struct timespec *t, int64_t ns) {
t->tv_nsec += ns;
while (t->tv_nsec >= ((int64_t)NSEC_PER_SEC)) {
t->tv_sec += 1;
t->tv_nsec -= NSEC_PER_SEC;
}
}
static int64_t calcdiff_ns(struct timespec t1, struct timespec t2) {
int64_t diff;
diff = NSEC_PER_SEC * ((int)t1.tv_sec - (int)t2.tv_sec);
diff += ((int)t1.tv_nsec - (int)t2.tv_nsec);
return diff;
}
static void rbuf_update_write_index(ring_buffer_t * rbuf) {
rbuf->write_index = (rbuf->write_index + 1) % rbuf->buf_len;
}
static void rbuf_update_read_index(ring_buffer_t * rbuf) {
rbuf->read_index = (rbuf->read_index + 1) % rbuf->buf_len;
}
static int rbuf_read_amount(const ring_buffer_t * rbuf) {
return (rbuf->read_index + rbuf->buf_len - rbuf->write_index) % rbuf->buf_len;
}
static int rbuf_write_amount(const ring_buffer_t * rbuf) {
return (rbuf->write_index + rbuf->buf_len - rbuf->read_index) % rbuf->buf_len;
}
#define RBUF_READ(rbuf, type) (((type *) rbuf.buffer) + (rbuf.read_index * rbuf.len))
#define RBUF_WRITE(rbuf, type) (((type *) rbuf.buffer) + (rbuf.write_index * rbuf.len))
#define RBUF_INIT(rbuf, _name, _buf_len, _len, type) do\
{\
log_debug("TRX_ECPRI", "Allocating %s with %d bytes\n", _name, (_buf_len * _len));\
rbuf.buffer = (type *) malloc(_buf_len * _len);\
strcpy(rbuf.name, _name);\
rbuf.buf_len = _buf_len;\
rbuf.len = _len;\
rbuf.write_index = 0;\
rbuf.read_index = 0;\
} while(0)
static void *recv_thread(void *p) {
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
int ret;
log_info("RECV_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->recv_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->recv_affinity);
for(;;) {
struct mmsghdr msgh[4000];
struct iovec msgv[4000];
memset(msgv, 0, sizeof(msgv));
memset(msgh, 0, sizeof(msgh));
for(int j = 0; j < ecpri_period_mult; j++) {
msgv[j].iov_base = RBUF_WRITE(rx_rbuf, uint8_t);
msgv[j].iov_len = rx_rbuf.len;
msgh[j].msg_hdr.msg_iov = &msgv[j];
msgh[j].msg_hdr.msg_iovlen = 1;
rbuf_update_write_index(&rx_rbuf);
}
ret = recvmmsg(recv_sockfd, msgh, ecpri_period_mult, 0, NULL);
if(ret == -1)
error(EXIT_FAILURE, errno, "recvmmsg error");
if(ret != ecpri_period_mult)
log_error("RECV_THREAD", "recvmmsg received %d messages instead of %d\n", ret, ecpri_period_mult);
pthread_mutex_lock(&rx_mutex);
pthread_cond_signal(&rx_cond);
pthread_mutex_unlock(&rx_mutex);
}
pthread_exit(EXIT_SUCCESS);
}
static void *send_thread(void *p) {
cpu_set_t mask;
struct timespec initial, next;
struct timespec t1[4000];
struct timespec t2[4000];
int k = 0;
TRXEcpriState * s = (TRXEcpriState *) p;
struct mmsghdr msgh[4000];
struct iovec msgv[4000];
log_info("SEND_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->send_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->send_affinity);
memset(msgv, 0, sizeof(msgv));
memset(msgh, 0, sizeof(msgh));
for(int j = 0; j < ecpri_period_mult; j++) {
msgh[j].msg_hdr.msg_name = &connect_sk_addr;
msgh[j].msg_hdr.msg_namelen = sizeof(connect_sk_addr);
msgh[j].msg_hdr.msg_iov = &msgv[j];
msgh[j].msg_hdr.msg_iovlen = 1;
}
pthread_mutex_lock(&tx_ready_mutex);
pthread_cond_wait(&tx_ready_cond, &tx_ready_mutex);
pthread_mutex_unlock(&tx_ready_mutex);
clock_gettime(CLOCK_TAI, &initial);
for(int64_t i = 1;; i++) {
int ret, msg_sent;
#ifdef DEBUG
if(i > SEND_LIMIT) {
int64_t d, dt;
clock_gettime(CLOCK_TAI, &next);
d = calcdiff_ns(next, initial);
for(int j = 0; j < k; j++) {
dt = calcdiff_ns(t2[j], t1[j]);
log_debug("SEND_THREAD", "%" PRIi64, dt);
}
log_debug("SEND_THREAD", "Packets sent: %" PRIi64, sent_frame_count);
log_debug("SEND_THREAD", "Duration: %" PRIi64, d);
log_debug("SEND_THREAD", "ecpri_period_mult: %" PRIi64, ecpri_period_mult);
log_debug("SEND_THREAD", "FRAME_FREQ: %" PRIi64, FRAME_FREQ);
exit(EXIT_SUCCESS);
}
#endif
next = initial;
// Multiply by i everytime to prevent any frequence drift
add_ns(&next, (ecpri_period_mult * NSEC_PER_SEC * i) / FRAME_FREQ);
for(int j = 0; j < ecpri_period_mult; j++) {
msgv[j].iov_base = RBUF_READ(tx_rbuf, uint8_t);
msgv[j].iov_len = tx_rbuf.len;
rbuf_update_read_index(&tx_rbuf);
}
for(msg_sent = 0; msg_sent < ecpri_period_mult;) {
#ifdef DEBUG
clock_gettime(CLOCK_TAI, &t1[k]);
#endif
ret = sendmmsg(send_sockfd, msgh + msg_sent, (ecpri_period_mult - msg_sent), 0);
#ifdef DEBUG
clock_gettime(CLOCK_TAI, &t2[k++]);
#endif
if(ret <= 0)
error(EXIT_FAILURE, errno, "sendmmsg error (returned %d)", ret);
msg_sent += ret;
sent_frame_count += ret;
}
pthread_mutex_lock(&tx_mutex);
pthread_cond_signal(&tx_cond);
pthread_mutex_unlock(&tx_mutex);
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
}
pthread_exit(EXIT_SUCCESS);
}
static void *prepare_thread(void *p) {
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
int tx_ready_buffer_full = 0;
log_info("PREPARE_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->prepare_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->prepare_affinity);
for(int64_t i = 0;; i++) {
int16_t samples_int[256];
// If we have frames to prepare
int n = rbuf_write_amount(&tx_rbuf);
if((i == 0) || n) {
// If there are frames from trx_write callback to prepare
if(rbuf_read_amount(&trx_write_rbuf)) {
int64_t ts = trx_wb_ts[trx_wb_part_read_index];
int empty_frames_ahead = ts - prepared_frame_count;
empty_frames_ahead = empty_frames_ahead < n ? empty_frames_ahead : n;
if(empty_frames_ahead > 0) {
for(int j = 0; j < empty_frames_ahead; j++) {
*((uint16_t *) (RBUF_WRITE(tx_rbuf, uint8_t) + 20)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
prepared_frame_count++;
}
}
else if (empty_frames_ahead == 0) {
int m = trx_wb_part[(trx_wb_part_read_index + 1) % TRX_WB_MAX_PARTS] - trx_write_rbuf.read_index;
m = m < n ? m : n;
for(int j = 0; j < m; j++) {
float * const trx_samples = RBUF_READ(trx_write_rbuf, float);
uint8_t * const tx_frame = RBUF_WRITE(tx_rbuf, uint8_t);
memset(samples_int, 0, 512);
float_to_int16(samples_int, trx_samples, 256, 32767);
encode_bf1(tx_frame + 22 , samples_int);
encode_bf1(tx_frame + 22 + 60 , samples_int + 64);
encode_bf1(tx_frame + 22 + 120, samples_int + 128);
encode_bf1(tx_frame + 22 + 180, samples_int + 192);
*((uint16_t *)(tx_frame + 20)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
rbuf_update_read_index(&trx_write_rbuf);
prepared_frame_count++;
}
if(m == 0)
trx_wb_part_read_index = (trx_wb_part_read_index + 1) % TRX_WB_MAX_PARTS;
}
else {
log_error("PREPARE_THREAD", "missed trx_write timestamp");
}
}
else {
*((uint16_t *) (RBUF_WRITE(tx_rbuf, uint8_t) + 20)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
prepared_frame_count++;
}
}
else {
if (!tx_ready_buffer_full) {
tx_ready_buffer_full = 1;
pthread_mutex_lock(&tx_ready_mutex);
pthread_cond_signal(&tx_ready_cond);
pthread_mutex_unlock(&tx_ready_mutex);
}
pthread_mutex_lock(&tx_mutex);
pthread_cond_wait(&tx_cond, &tx_mutex);
pthread_mutex_unlock(&tx_mutex);
}
}
pthread_exit(EXIT_SUCCESS);
}
static void *decompress_thread(void *p) {
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
int rx_ready = 0;
const float mult = 1. / 32767.;
log_info("DECOMPRESS_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->decompress_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->decompress_affinity);
for(;;) {
int n = rbuf_read_amount(&rx_rbuf);
if(n) {
for(int j = 0; j < n; j++) {
int16_t samples_int[256];
const uint8_t * rx_samples = RBUF_READ(rx_rbuf, uint8_t) + 22;
// TODO : analyze seq_id, ecpri packet type etc... ?
// TODO : set rx_ready at some point (when ?)
if(rx_ready) {
memset(samples_int, 0, 512);
decode_bf1(samples_int , rx_samples , 16);
decode_bf1(samples_int + 64 , rx_samples + 60, 16);
decode_bf1(samples_int + 128, rx_samples + 120, 16);
decode_bf1(samples_int + 192, rx_samples + 180, 16);
int16_to_float(RBUF_WRITE(trx_read_rbuf, float), samples_int, 256, mult);
rbuf_update_read_index(&rx_rbuf);
rbuf_update_write_index(&trx_read_rbuf);
sem_post(&trx_read_sem);
}
}
}
else {
pthread_mutex_lock(&rx_mutex);
pthread_cond_wait(&rx_cond, &rx_mutex);
pthread_mutex_unlock(&rx_mutex);
}
}
pthread_exit(EXIT_SUCCESS);
}
static int start_threads(TRXEcpriState * s) {
pthread_t recv_pthread;
pthread_t send_pthread;
pthread_t prepare_pthread;
pthread_t decompress_pthread;
struct sched_param recv_param;
struct sched_param send_param;
struct sched_param prepare_param;
struct sched_param decompress_param;
pthread_attr_t recv_attr;
pthread_attr_t send_attr;
pthread_attr_t prepare_attr;
pthread_attr_t decompress_attr;
log_info("TRX_ECPRI", "Starting threads");
// Initialize pthread attributes (default values)
if (pthread_attr_init(&recv_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
// Set a specific stack size
if (pthread_attr_setstacksize(&recv_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
// Set scheduler policy and priority of pthread
if (pthread_attr_setschedpolicy(&recv_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
recv_param.sched_priority = 97;
if (pthread_attr_setschedparam(&recv_attr, &recv_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
/* Use scheduling parameters of attr */
if (pthread_attr_setinheritsched(&recv_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&send_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&send_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&send_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
send_param.sched_priority = 97;
if (pthread_attr_setschedparam(&send_attr, &send_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&send_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&prepare_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&prepare_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&prepare_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
prepare_param.sched_priority = 97;
if (pthread_attr_setschedparam(&prepare_attr, &prepare_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&prepare_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&decompress_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&decompress_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&decompress_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
decompress_param.sched_priority = 97;
if (pthread_attr_setschedparam(&decompress_attr, &decompress_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&decompress_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_create(&recv_pthread, NULL, recv_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create recv thread");
if (pthread_create(&send_pthread, NULL, send_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create send thread");
if (pthread_create(&prepare_pthread, NULL, prepare_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create prepare thread");
if (pthread_create(&decompress_pthread, NULL, decompress_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create decompress thread");
return 0;
}
int start(TRXEcpriState * s) {
uint8_t dst_mac[6];
uint8_t src_mac[6];
uint8_t ecpri_packet[PACKET_SIZE];
struct ether_header *eh = (struct ether_header *) ecpri_packet;
int if_index;
log_debug("TRX_ECPRI", "raw socket setup");
//set_latency_target();
seq_id = 0;
read_frame_count = 0;
sent_frame_count = 0;
prepared_frame_count = 0;
ecpri_period_mult = (s->ecpri_period * FRAME_FREQ) / 1000000;
rxtx_buf_size = (3 * ecpri_period_mult);
RBUF_INIT(rx_rbuf, "RX ring buffer", rxtx_buf_size, PACKET_SIZE, uint8_t);
RBUF_INIT(tx_rbuf, "TX ring buffer", rxtx_buf_size, PACKET_SIZE, uint8_t);
RBUF_INIT(trx_read_rbuf, "TRXRead ring buffer", TRX_BUF_MAX_SIZE, 256, float);
RBUF_INIT(trx_write_rbuf, "TRXWrite ring buffer", TRX_BUF_MAX_SIZE, 256, float);
trx_wb_part_read_index = 0;
trx_wb_part_write_index = 0;
pthread_mutex_init(&tx_mutex, NULL);
pthread_mutex_init(&rx_mutex, NULL);
pthread_mutex_init(&tx_ready_mutex, NULL);
pthread_cond_init(&tx_cond, NULL);
pthread_cond_init(&rx_cond, NULL);
pthread_cond_init(&tx_ready_cond, NULL);
sem_init(&trx_read_sem, 0, 0);
memset((uint8_t *) ecpri_packet, 0, PACKET_SIZE);
if (!(if_index = if_nametoindex(s->rec_if))) {
perror("if_nametoindex");
return 1;
}
if(sscanf(s->re_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c", &dst_mac[0], &dst_mac[1], &dst_mac[2], &dst_mac[3], &dst_mac[4], &dst_mac[5]) != 6)
fprintf(stderr, "Invalid eRE MAC address\n");
if(sscanf(s->rec_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c", &src_mac[0], &src_mac[1], &src_mac[2], &src_mac[3], &src_mac[4], &src_mac[5]) != 6)
fprintf(stderr, "Invalid eREC MAC address\n");
if ((send_sockfd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL))) == -1) {
perror("Socket Error");
return 1;
}
if ((recv_sockfd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL))) == -1) {
perror("Socket Error");
return 1;
}
connect_sk_addr.sll_ifindex = if_index;
connect_sk_addr.sll_halen = ETH_ALEN;
for(int i = 0; i < 6; i++)
connect_sk_addr.sll_addr[i] = dst_mac[i];
log_debug("TRX_ECPRI", "bind");
for(int i = 0; i < 6; i++)
eh->ether_shost[i] = src_mac[i];
for(int i = 0; i < 6; i++)
eh->ether_dhost[i] = dst_mac[i];
/* Ethertype field */
eh->ether_type = htons(0xaefe);
/* Standard Header */
ecpri_packet[14] = 0x10; // Protocol data revision 0x1, C = 0
// Message type = 0x00, IQ data
// Payload size
*((uint16_t *) (ecpri_packet + 16)) = htons(244);
*((uint16_t *) (ecpri_packet + 18)) = htons(s->flow_id);
for(int i = 0; i < rxtx_buf_size; i++) {
//log_debug("TRX_ECPRI", "%d / %d - %d\n", i, rxtx_buf_size, tx_rbuf.len);
memcpy(((uint8_t *) tx_rbuf.buffer) + (i * tx_rbuf.len), ecpri_packet, tx_rbuf.len);
}
start_threads(s);
return 0;
}
static void trx_ecpri_end(TRXState *s1)
{
log_info("TRX_ECPRI", "End");
TRXEcpriState *s = s1->opaque;
free(s);
}
static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void **__samples, int count, int tx_port_index, TRXWriteMetadata *md)
{
(void) s1;
float ** _samples = (float **) __samples;
int write_count = count >> 5;
int64_t ts = timestamp >> 5;
trx_wb_part[trx_wb_part_write_index] = trx_write_rbuf.write_index;
trx_wb_ts[trx_wb_part_write_index] = ts;
for(int k = 0; k < write_count; k++) {
for(int i = 0; i < 4; i++)
for(int j = 0; j < 64; j++)
RBUF_WRITE(trx_write_rbuf, float)[i * 64 + j] = _samples[i][j + (k << 6)];
rbuf_update_write_index(&trx_write_rbuf);
}
trx_wb_part_write_index = (trx_wb_part_write_index + 1) % TRX_WB_MAX_PARTS;
trx_wb_part[trx_wb_part_write_index] = trx_write_rbuf.write_index + write_count;
}
static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__samples, int count, int rx_port_index, TRXReadMetadata *md)
{
(void) s1;
float ** _samples = (float **) __samples;
int read_count = count >> 5;
for(int k = 0; k < read_count; k++) {
float * trx_samples;
sem_wait(&trx_read_sem);
trx_samples = RBUF_READ(trx_read_rbuf, float);
for(int i = 0; i < 4; i++)
for(int j = 0; j < 64; j++)
_samples[i][j + (k << 6)] = trx_samples[i * 64 + j];
rbuf_update_read_index(&trx_read_rbuf);
}
*ptimestamp = read_frame_count << 5;
read_frame_count += read_count;
return count;
}
/* This function can be used to automatically set the sample
rate. Here we don't implement it, so the user has to force a given
sample rate with the "sample_rate" configuration option */
static int trx_ecpri_get_sample_rate(TRXState *s1, TRXFraction *psample_rate,
int *psample_rate_num, int sample_rate_min)
{
return -1;
}
static int trx_ecpri_start(TRXState *s1, const TRXDriverParams *params)
{
TRXEcpriState *s = s1->opaque;
s->sample_rate = params->sample_rate[0].num / params->sample_rate[0].den;
start(s);
return 0;
}
int trx_driver_init(TRXState *s1)
{
TRXEcpriState *s;
double val;
// Lock all current and future pages from preventing of being paged to
// swap
if (mlockall(MCL_CURRENT | MCL_FUTURE)) {
log_error("TRX_ECPRI", "mlockall failed");
}
log_info("TRX_ECPRI", "Init");
if (s1->trx_api_version != TRX_API_VERSION) {
fprintf(stderr, "ABI compatibility mismatch between LTEENB and TRX driver (LTEENB ABI version=%d, TRX driver ABI version=%d)\n",
s1->trx_api_version, TRX_API_VERSION);
return -1;
}
s = malloc(sizeof(TRXEcpriState));
memset(s, 0, sizeof(*s));
trx_get_param_double(s1, &val, "recv_affinity");
s->recv_affinity = (int) val;
trx_get_param_double(s1, &val, "send_affinity");
s->send_affinity = (int) val;
trx_get_param_double(s1, &val, "prepare_affinity");
s->send_affinity = (int) val;
trx_get_param_double(s1, &val, "decompress_affinity");
s->send_affinity = (int) val;
trx_get_param_double(s1, &val, "flow_id");
s->flow_id = (int) val;
trx_get_param_double(s1, &val, "ecpri_period");
s->ecpri_period = (int) val;
s->re_mac = trx_get_param_string(s1, "re_mac");
s->rec_mac = trx_get_param_string(s1, "rec_mac");
s->rec_if = trx_get_param_string(s1, "rec_if");
s1->opaque = s;
s1->trx_end_func = trx_ecpri_end;
s1->trx_write_func2 = trx_ecpri_write;
s1->trx_read_func2 = trx_ecpri_read;
s1->trx_start_func = trx_ecpri_start;
s1->trx_get_sample_rate_func = trx_ecpri_get_sample_rate;
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment