Commit b5e2c148 authored by Joanne Hugé's avatar Joanne Hugé

Add option to dump ecpri frames, clean up

parent a589b6d7
...@@ -35,6 +35,8 @@ typedef struct { ...@@ -35,6 +35,8 @@ typedef struct {
const char * re_mac; const char * re_mac;
const char * rec_mac; const char * rec_mac;
const char * rec_if; const char * rec_if;
const char * dpdk_options;
const char * trace_file;
int recv_affinity; int recv_affinity;
int send_affinity; int send_affinity;
int prepare_affinity; int prepare_affinity;
...@@ -43,6 +45,7 @@ typedef struct { ...@@ -43,6 +45,7 @@ typedef struct {
int ecpri_period; int ecpri_period;
int flow_id; int flow_id;
int sample_rate; int sample_rate;
int trace_period;
} TRXEcpriState; } TRXEcpriState;
static void log_error(const char * section, const char * msg, ...) { static void log_error(const char * section, const char * msg, ...) {
time_t t; time_t t;
...@@ -83,12 +86,46 @@ int main(int argc, char * argv[]) { ...@@ -83,12 +86,46 @@ int main(int argc, char * argv[]) {
(void) argv; (void) argv;
TRXEcpriState *s; TRXEcpriState *s;
// Lock all current and future pages from preventing of being paged to
// swap
if (mlockall(MCL_CURRENT | MCL_FUTURE)) {
log_error("TRX_ECPRI", "mlockall failed");
}
s = malloc(sizeof(TRXEcpriState)); s = malloc(sizeof(TRXEcpriState));
memset(s, 0, sizeof(*s)); memset(s, 0, sizeof(*s));
s->rec_mac = "b8:59:9f:07:86:42"; // Tiogapass001 mellanox
s->re_mac = "04:09:a5:0f:9f:4c"; //s->rec_mac = "b8:59:9f:07:7d:da";
//s->re_mac = "04:09:a5:0f:9f:4c";
//s->rec_if = "ens9f0np0";
// Loopback
//s->rec_mac = "b8:ce:f6:4b:00:22";
//s->re_mac = "b8:ce:f6:4b:00:23";
//s->rec_if = "ens5f0np0";
//s->rec_mac = "b4:96:91:a7:1c:f4";
//s->re_mac = "04:09:a5:0f:9f:4c";
//s->rec_if = "ens5f0";
// tiogapass003 mellanox
//s->rec_mac = "b8:59:9f:07:7e:2a";
//s->re_mac = "b8:59:9f:07:86:42";
//s->rec_if = "ens9f0";
//s->dpdk_options = " -l 28 -b 0000:04:00.0 -b 0000:5e:00.1 ";
// HFR tiogapass mellanox
s->rec_mac = "b8:59:9f:07:82:ca";
s->re_mac = "04:09:a5:0f:76:1c";
s->rec_if = "ens9f0"; s->rec_if = "ens9f0";
s->dpdk_options = " -l 28 -b 0000:04:00.0 -b 0000:5e:00.1 ";
// tiogapass004 mellanox
//s->rec_mac = "b8:59:9f:07:86:42";
//s->re_mac = "b8:59:9f:07:7e:2a";
//s->rec_if = "ens9f0";
//s->dpdk_options = " -l 28 -b 0000:04:00.0 -b 0000:3b:00.0 -b 0000:3b:00.1 -b 0000:5e:00.1 ";
s->recv_affinity = 39; s->recv_affinity = 39;
s->send_affinity = 38; s->send_affinity = 38;
...@@ -99,6 +136,10 @@ int main(int argc, char * argv[]) { ...@@ -99,6 +136,10 @@ int main(int argc, char * argv[]) {
s->flow_id = 0; s->flow_id = 0;
s->sample_rate = 122880000; s->sample_rate = 122880000;
s->trace_file = "/root/ecpri_trace";
s->trace_period = 1000000;
log_info("TEST-DPDK-ECPRI", "Starting test...\n"); log_info("TEST-DPDK-ECPRI", "Starting test...\n");
log_info("TEST-DPDK-ECPRI", "rec-mac: %s, re-mac: %s, rec-if: %s", s->rec_mac, s->re_mac, s->rec_if); log_info("TEST-DPDK-ECPRI", "rec-mac: %s, re-mac: %s, rec-if: %s", s->rec_mac, s->re_mac, s->rec_if);
......
chrt -f 97 taskset -c 38 ptp4l -H -i ens5f0 -m -f $HOME/linuxptp/configs/G.8275.1.cfg chrt -f 97 taskset -c 38 ptp4l -H -i ens9f0 -m -f $HOME/linuxptp/configs/G.8275.1.cfg
...@@ -45,6 +45,8 @@ ...@@ -45,6 +45,8 @@
#define DEBUG #define DEBUG
#define SSE4 /* define if CPU supports SSE4.1 */ #define SSE4 /* define if CPU supports SSE4.1 */
// Tiogapass004
//#define DST_ADDR_SYNTAX
#include "private.c" #include "private.c"
...@@ -171,6 +173,7 @@ typedef struct { ...@@ -171,6 +173,7 @@ typedef struct {
const char * rec_mac; const char * rec_mac;
const char * rec_if; const char * rec_if;
const char * dpdk_options; const char * dpdk_options;
const char * trace_file;
int recv_affinity; int recv_affinity;
int send_affinity; int send_affinity;
int prepare_affinity; int prepare_affinity;
...@@ -179,6 +182,7 @@ typedef struct { ...@@ -179,6 +182,7 @@ typedef struct {
int ecpri_period; int ecpri_period;
int flow_id; int flow_id;
int sample_rate; int sample_rate;
int trace_period;
} TRXEcpriState; } TRXEcpriState;
// Buffers // Buffers
...@@ -335,6 +339,7 @@ static void send_packets(int port) { ...@@ -335,6 +339,7 @@ static void send_packets(int port) {
int pkt_size; int pkt_size;
pkt[i] = rte_pktmbuf_alloc(mbuf_pool); pkt[i] = rte_pktmbuf_alloc(mbuf_pool);
eth_hdr = rte_pktmbuf_mtod(pkt[i], struct rte_ether_hdr*); eth_hdr = rte_pktmbuf_mtod(pkt[i], struct rte_ether_hdr*);
#ifdef DST_ADDR_SYNTAX
if(port) { if(port) {
eth_hdr->dst_addr = s_addr; eth_hdr->dst_addr = s_addr;
eth_hdr->src_addr = d_addr; eth_hdr->src_addr = d_addr;
...@@ -342,6 +347,15 @@ static void send_packets(int port) { ...@@ -342,6 +347,15 @@ static void send_packets(int port) {
eth_hdr->dst_addr = d_addr; eth_hdr->dst_addr = d_addr;
eth_hdr->src_addr = s_addr; eth_hdr->src_addr = s_addr;
} }
#else
if(port) {
eth_hdr->d_addr = s_addr;
eth_hdr->s_addr = d_addr;
} else {
eth_hdr->d_addr = d_addr;
eth_hdr->s_addr = s_addr;
}
#endif
eth_hdr->ether_type = htons(0xaefe); eth_hdr->ether_type = htons(0xaefe);
memcpy(rte_pktmbuf_mtod_offset(pkt[i], uint8_t *, sizeof(struct rte_ether_hdr)), data[i], DATA_SIZE); memcpy(rte_pktmbuf_mtod_offset(pkt[i], uint8_t *, sizeof(struct rte_ether_hdr)), data[i], DATA_SIZE);
//pkt_size = DATA_SIZE + sizeof(struct rte_ether_hdr); //pkt_size = DATA_SIZE + sizeof(struct rte_ether_hdr);
...@@ -365,12 +379,17 @@ static void send_packets(int port) { ...@@ -365,12 +379,17 @@ static void send_packets(int port) {
} }
// TODO store received packets' data in buffer // TODO store received packets' data in buffer
static int recv_packets(int port) { static int recv_packets(int port) {
struct rte_mbuf * pkt[BURST_SIZE]; struct rte_mbuf * pkt[1024];
while(1) { while(1) {
const int nb_rx = rte_eth_rx_burst(port, 0, pkt, 1024); const int nb_rx = rte_eth_rx_burst(port, 0, pkt, 1024);
for(int i = 0; i < nb_rx; i++) for(int i = 0; i < nb_rx; i++) {
for(int j = 0; j < PACKET_SIZE; j++) {
*(RBUF_WRITE(rx_rbuf, uint8_t) + j) = *rte_pktmbuf_mtod_offset(pkt[i], uint8_t *, j);
}
rbuf_update_write_index(&rx_rbuf);
rte_pktmbuf_free(pkt[i]); rte_pktmbuf_free(pkt[i]);
}
if(nb_rx) if(nb_rx)
return nb_rx; return nb_rx;
...@@ -395,10 +414,9 @@ static void *recv_thread(void *p) { ...@@ -395,10 +414,9 @@ static void *recv_thread(void *p) {
recv_frame_count += recv_packets(0); recv_frame_count += recv_packets(0);
for(int j = 0; j < ecpri_period_mult; j++) { //for(int j = 0; j < ecpri_period_mult; j++) {
// TODO write rx_buf // TODO write rx_buf
rbuf_update_write_index(&rx_rbuf); //}
}
pthread_mutex_lock(&rx_mutex); pthread_mutex_lock(&rx_mutex);
pthread_cond_signal(&rx_cond); pthread_cond_signal(&rx_cond);
...@@ -546,22 +564,50 @@ static void *decompress_thread(void *p) { ...@@ -546,22 +564,50 @@ static void *decompress_thread(void *p) {
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
int rx_ready = 0; int rx_ready = 0;
const float mult = 1. / 32767.; const float mult = 1. / 32767.;
FILE * trace_file_desc;
log_info("DECOMPRESS_THREAD", "Thread init"); log_info("DECOMPRESS_THREAD", "Thread init");
if(s->trace_period)
trace_file_desc = fopen(s->trace_file, "w+");
// Set thread CPU affinity // Set thread CPU affinity
CPU_ZERO(&mask); CPU_ZERO(&mask);
CPU_SET(s->decompress_affinity, &mask); CPU_SET(s->decompress_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask)) if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->decompress_affinity); error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->decompress_affinity);
for(;;) { for(uint64_t k = 0;;) {
int n = rbuf_read_amount(&rx_rbuf); int n = rbuf_read_amount(&rx_rbuf);
if(n) { if(n) {
for(int j = 0; j < n; j++) { for(int j = 0; j < n; j++) {
int16_t samples_int[N_SAMPLES]; int16_t samples_int[N_SAMPLES];
const uint8_t * dst_mac = RBUF_READ(rx_rbuf, uint8_t);
const uint8_t * src_mac = RBUF_READ(rx_rbuf, uint8_t) + 6;
const uint16_t ether_type = (uint16_t) *(RBUF_READ(rx_rbuf, uint8_t) + 12);
const uint8_t ecpri_protocol_rev = *(RBUF_READ(rx_rbuf, uint8_t) + 14);
const uint8_t ecpri_message_type = *(RBUF_READ(rx_rbuf, uint8_t) + 15);
const uint8_t ecpri_payload_size = *(RBUF_READ(rx_rbuf, uint8_t) + 16);
const uint16_t pc_id = (uint16_t) *(RBUF_READ(rx_rbuf, uint8_t) + 18);
const uint16_t seq_id = (uint16_t) *(RBUF_READ(rx_rbuf, uint8_t) + 20);
const uint8_t * rx_samples = RBUF_READ(rx_rbuf, uint8_t) + 22; const uint8_t * rx_samples = RBUF_READ(rx_rbuf, uint8_t) + 22;
if(s->trace_period && !(k % s->trace_period)) {
fprintf(trace_file_desc,
"%010" PRIu64 " %x:%x:%x:%x:%x:%x %x:%x:%x:%x:%x:%x %x\n"
" %x %x %u\n"
" %x %x\n",
k,
dst_mac[0], dst_mac[1], dst_mac[2], dst_mac[3], dst_mac[4], dst_mac[5],
src_mac[0], src_mac[1], src_mac[2], src_mac[3], src_mac[4], src_mac[5],
ether_type,
ecpri_protocol_rev, ecpri_message_type, ecpri_payload_size,
pc_id, seq_id);
}
k++;
// TODO : analyze seq_id, ecpri packet type etc... ? // TODO : analyze seq_id, ecpri packet type etc... ?
// TODO : set rx_ready at some point (when ?) // TODO : set rx_ready at some point (when ?)
rbuf_update_read_index(&rx_rbuf);
if(rx_ready) { if(rx_ready) {
memset(samples_int, 0, 512); memset(samples_int, 0, 512);
...@@ -571,7 +617,6 @@ static void *decompress_thread(void *p) { ...@@ -571,7 +617,6 @@ static void *decompress_thread(void *p) {
decode_bf1(samples_int + 192, rx_samples + 180, 16); decode_bf1(samples_int + 192, rx_samples + 180, 16);
int16_to_float(RBUF_WRITE(trx_read_rbuf, float), samples_int, N_SAMPLES, mult); int16_to_float(RBUF_WRITE(trx_read_rbuf, float), samples_int, N_SAMPLES, mult);
rbuf_update_read_index(&rx_rbuf);
rbuf_update_write_index(&trx_read_rbuf); rbuf_update_write_index(&trx_read_rbuf);
sem_post(&trx_read_sem); sem_post(&trx_read_sem);
} }
...@@ -606,6 +651,13 @@ static void *statistic_thread(void *p) { ...@@ -606,6 +651,13 @@ static void *statistic_thread(void *p) {
clock_gettime(CLOCK_TAI, &initial); clock_gettime(CLOCK_TAI, &initial);
next = initial; next = initial;
log_info("STATS", "%14s - %14s - %14s - %14s - %14s %14s ",
"prepared",
"read",
"sent",
"received",
"pps",
"ppsr");
for(;;) { for(;;) {
int64_t pps, ppsr; int64_t pps, ppsr;
add_ns(&next, STATISTIC_REFRESH_RATE); add_ns(&next, STATISTIC_REFRESH_RATE);
...@@ -768,7 +820,7 @@ int startdpdk(TRXEcpriState * s) { ...@@ -768,7 +820,7 @@ int startdpdk(TRXEcpriState * s) {
ecpri_period_mult = (s->ecpri_period * FRAME_FREQ) / 1000000; ecpri_period_mult = (s->ecpri_period * FRAME_FREQ) / 1000000;
rxtx_buf_size = (3 * ecpri_period_mult); rxtx_buf_size = (3 * ecpri_period_mult);
RBUF_INIT(rx_rbuf, "RX ring buffer", rxtx_buf_size, DATA_SIZE, uint8_t); RBUF_INIT(rx_rbuf, "RX ring buffer", rxtx_buf_size, PACKET_SIZE, uint8_t);
RBUF_INIT(tx_rbuf, "TX ring buffer", rxtx_buf_size, DATA_SIZE, uint8_t); RBUF_INIT(tx_rbuf, "TX ring buffer", rxtx_buf_size, DATA_SIZE, uint8_t);
RBUF_INIT(trx_read_rbuf, "TRXRead ring buffer", TRX_BUF_MAX_SIZE, N_SAMPLES, float); RBUF_INIT(trx_read_rbuf, "TRXRead ring buffer", TRX_BUF_MAX_SIZE, N_SAMPLES, float);
RBUF_INIT(trx_write_rbuf, "TRXWrite ring buffer", TRX_BUF_MAX_SIZE, N_SAMPLES, float); RBUF_INIT(trx_write_rbuf, "TRXWrite ring buffer", TRX_BUF_MAX_SIZE, N_SAMPLES, float);
...@@ -920,6 +972,8 @@ int trx_driver_init(TRXState *s1) ...@@ -920,6 +972,8 @@ int trx_driver_init(TRXState *s1)
s->statistic_affinity = (int) val; s->statistic_affinity = (int) val;
trx_get_param_double(s1, &val, "flow_id"); trx_get_param_double(s1, &val, "flow_id");
s->flow_id = (int) val; s->flow_id = (int) val;
trx_get_param_double(s1, &val, "trace_period");
s->trace_period = (int) val;
trx_get_param_double(s1, &val, "ecpri_period"); trx_get_param_double(s1, &val, "ecpri_period");
if(((int) val) == 0) { if(((int) val) == 0) {
fprintf(stderr, "ecpri_period parameter can't be null\n"); fprintf(stderr, "ecpri_period parameter can't be null\n");
...@@ -931,6 +985,7 @@ int trx_driver_init(TRXState *s1) ...@@ -931,6 +985,7 @@ int trx_driver_init(TRXState *s1)
s->rec_mac = trx_get_param_string(s1, "rec_mac"); s->rec_mac = trx_get_param_string(s1, "rec_mac");
s->rec_if = trx_get_param_string(s1, "rec_if"); s->rec_if = trx_get_param_string(s1, "rec_if");
s->dpdk_options = trx_get_param_string(s1, "dpdk_options"); s->dpdk_options = trx_get_param_string(s1, "dpdk_options");
s->trace_file = trx_get_param_string(s1, "trace_file");
s1->opaque = s; s1->opaque = s;
s1->trx_end_func = trx_ecpri_end; s1->trx_end_func = trx_ecpri_end;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment