Commit a0ce7a7c authored by Joanne Hugé's avatar Joanne Hugé

wip: debug losing rx packets

parent 4dc27f6c
...@@ -137,8 +137,9 @@ typedef struct { ...@@ -137,8 +137,9 @@ typedef struct {
int rx_n_channel; int rx_n_channel;
int tx_n_channel; int tx_n_channel;
int statistics_refresh_rate_ns; int statistics_refresh_rate_ms;
int sample_rate; int sample_rate;
int rx_drop_pcm;
} TRXEcpriState; } TRXEcpriState;
typedef struct { typedef struct {
...@@ -201,12 +202,13 @@ static counter_stat_t write_counter; // IQs to write from TRX ...@@ -201,12 +202,13 @@ static counter_stat_t write_counter; // IQs to write from TRX
static counter_stat_t sent_counter; // IQs sent to RRH static counter_stat_t sent_counter; // IQs sent to RRH
static counter_stat_t rx_drop_counter; // rx packets dropped by driver static counter_stat_t rx_drop_counter; // rx packets dropped by driver
static counter_stat_t tx_drop_counter; // tx packets dropped by driver static counter_stat_t tx_drop_counter; // tx packets dropped by driver
static counter_stat_t lost_rx_counter; // gaps between seq_id's static counter_stat_t lost_rx_counter[MAX_CHANNELS]; // gaps between seq_id's
static counter_stat_t rx_packet_counter; // packets received from RRH static counter_stat_t rx_packet_counter; // packets received from RRH
static counter_stat_t tx_packet_counter; // packets sent from TRX static counter_stat_t tx_packet_counter; // packets sent from TRX
// Network // Network
static int tx_seq_id; static int tx_seq_id;
static int seq_id_offset[MAX_CHANNELS];
static uint8_t frame_id; static uint8_t frame_id;
static uint8_t subframe_id; static uint8_t subframe_id;
static uint8_t slot_id; static uint8_t slot_id;
...@@ -310,10 +312,13 @@ static void print_stats(FILE * f, int print_header) { ...@@ -310,10 +312,13 @@ static void print_stats(FILE * f, int print_header) {
int offset = 0; int offset = 0;
if(print_header) { if(print_header) {
fprintf(f, fprintf(f,
"%6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s\n", "%6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s\n",
"rxdrop", "rxdrop",
"txdrop", "txdrop",
"lostrx", "lstrx0",
"lstrx1",
"lstrx2",
"lstrx3",
"recv 0", "recv 0",
"recv 1", "recv 1",
"recv 2", "recv 2",
...@@ -329,13 +334,19 @@ static void print_stats(FILE * f, int print_header) { ...@@ -329,13 +334,19 @@ static void print_stats(FILE * f, int print_header) {
"tx pkt", "tx pkt",
"rx pps", "rx pps",
"tx pps", "tx pps",
"rxbufw", "rxbuf0",
"rxbuf1",
"rxbuf2",
"rxbuf3",
"txbufw"); "txbufw");
} }
offset = 0; offset = 0;
int64_to_6s(s + 7 * offset++, rx_drop_counter.counter); int64_to_6s(s + 7 * offset++, rx_drop_counter.counter);
int64_to_6s(s + 7 * offset++, tx_drop_counter.counter); int64_to_6s(s + 7 * offset++, tx_drop_counter.counter);
int64_to_6s(s + 7 * offset++, lost_rx_counter.counter); int64_to_6s(s + 7 * offset++, lost_rx_counter[0].counter);
int64_to_6s(s + 7 * offset++, lost_rx_counter[1].counter);
int64_to_6s(s + 7 * offset++, lost_rx_counter[2].counter);
int64_to_6s(s + 7 * offset++, lost_rx_counter[3].counter);
int64_to_6s(s + 7 * offset++, recv_counter[0].counter); int64_to_6s(s + 7 * offset++, recv_counter[0].counter);
int64_to_6s(s + 7 * offset++, recv_counter[1].counter); int64_to_6s(s + 7 * offset++, recv_counter[1].counter);
int64_to_6s(s + 7 * offset++, recv_counter[2].counter); int64_to_6s(s + 7 * offset++, recv_counter[2].counter);
...@@ -343,7 +354,10 @@ static void print_stats(FILE * f, int print_header) { ...@@ -343,7 +354,10 @@ static void print_stats(FILE * f, int print_header) {
int64_to_6s(s + 7 * offset++, read_counter.counter); int64_to_6s(s + 7 * offset++, read_counter.counter);
int64_to_6s(s + 7 * offset++, write_counter.counter); int64_to_6s(s + 7 * offset++, write_counter.counter);
int64_to_6s(s + 7 * offset++, sent_counter.counter); int64_to_6s(s + 7 * offset++, sent_counter.counter);
int64_to_6s(s + 7 * offset++, recv_counter[0].freq); int64_to_6s(s + 7 * offset++, recv_counter[0].freq +
recv_counter[1].freq +
recv_counter[2].freq +
recv_counter[3].freq);
int64_to_6s(s + 7 * offset++, read_counter.freq); int64_to_6s(s + 7 * offset++, read_counter.freq);
int64_to_6s(s + 7 * offset++, write_counter.freq); int64_to_6s(s + 7 * offset++, write_counter.freq);
int64_to_6s(s + 7 * offset++, sent_counter.freq); int64_to_6s(s + 7 * offset++, sent_counter.freq);
...@@ -351,11 +365,14 @@ static void print_stats(FILE * f, int print_header) { ...@@ -351,11 +365,14 @@ static void print_stats(FILE * f, int print_header) {
int64_to_6s(s + 7 * offset++, tx_packet_counter.counter); int64_to_6s(s + 7 * offset++, tx_packet_counter.counter);
int64_to_6s(s + 7 * offset++, rx_packet_counter.freq); int64_to_6s(s + 7 * offset++, rx_packet_counter.freq);
int64_to_6s(s + 7 * offset++, tx_packet_counter.freq); int64_to_6s(s + 7 * offset++, tx_packet_counter.freq);
int64_to_6s(s + 7 * offset++, rbuf_write_amount(&trxr_rbuf[0])); int64_to_6s(s + 7 * offset++, rbuf_write_amount(&trxr_rbuf[0]) / 4);
int64_to_6s(s + 7 * offset++, rbuf_write_amount(&trxw_rbuf[0])); int64_to_6s(s + 7 * offset++, rbuf_write_amount(&trxr_rbuf[1]) / 4);
int64_to_6s(s + 7 * offset++, rbuf_write_amount(&trxr_rbuf[2]) / 4);
int64_to_6s(s + 7 * offset++, rbuf_write_amount(&trxr_rbuf[3]) / 4);
int64_to_6s(s + 7 * offset++, rbuf_write_amount(&trxw_rbuf[0]) / 4);
offset = 0; offset = 0;
fprintf(f, fprintf(f,
"%6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s\n", "%6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s\n",
s + 7 * 0, s + 7 * 0,
s + 7 * 1, s + 7 * 1,
s + 7 * 2, s + 7 * 2,
...@@ -375,7 +392,13 @@ static void print_stats(FILE * f, int print_header) { ...@@ -375,7 +392,13 @@ static void print_stats(FILE * f, int print_header) {
s + 7 * 16, s + 7 * 16,
s + 7 * 17, s + 7 * 17,
s + 7 * 18, s + 7 * 18,
s + 7 * 19); s + 7 * 19,
s + 7 * 20,
s + 7 * 21,
s + 7 * 22,
s + 7 * 23,
s + 7 * 24,
s + 7 * 25);
} }
static void log_exit(const char * section, const char * msg, ...) { static void log_exit(const char * section, const char * msg, ...) {
...@@ -457,6 +480,8 @@ static void *recv_thread(void *p) { ...@@ -457,6 +480,8 @@ static void *recv_thread(void *p) {
int stop = 0; int stop = 0;
int prev_seq_id[MAX_CHANNELS] = {-1, -1, -1, -1}; int prev_seq_id[MAX_CHANNELS] = {-1, -1, -1, -1};
uint16_t antenna_id; uint16_t antenna_id;
uint16_t payload_size;
uint8_t seq_id;
struct mmsghdr msgh[MAX_RX_BURST]; struct mmsghdr msgh[MAX_RX_BURST];
struct iovec msgv[MAX_RX_BURST]; struct iovec msgv[MAX_RX_BURST];
ecpri_header * header; ecpri_header * header;
...@@ -488,6 +513,8 @@ static void *recv_thread(void *p) { ...@@ -488,6 +513,8 @@ static void *recv_thread(void *p) {
int ret = recvmmsg(recv_sockfd, msgh, s->rx_burst, 0, NULL); int ret = recvmmsg(recv_sockfd, msgh, s->rx_burst, 0, NULL);
if(ret <= -1) if(ret <= -1)
error(EXIT_FAILURE, errno, "recvmmsg error"); error(EXIT_FAILURE, errno, "recvmmsg error");
if((i % 100000) < s->rx_drop_pcm)
continue;
// Process each received message // Process each received message
for(int j = 0; j < ret; j++) { for(int j = 0; j < ret; j++) {
header = (ecpri_header*) (rx_buf + j * MAX_PACKET_SIZE); header = (ecpri_header*) (rx_buf + j * MAX_PACKET_SIZE);
...@@ -523,11 +550,12 @@ static void *recv_thread(void *p) { ...@@ -523,11 +550,12 @@ static void *recv_thread(void *p) {
stop = 1; break; stop = 1; break;
} }
antenna_id = ntohs(header->antenna_id); antenna_id = ntohs(header->antenna_id);
seq_id = header->seq_id;
// Exit if SEQ ID is not sequential // Exit if SEQ ID is not sequential
if ( prev_seq_id[antenna_id] != -1 && (header->seq_id + 256 - prev_seq_id[antenna_id]) % 256 != 1 ) { if ( prev_seq_id[antenna_id] != -1 && (seq_id + 256 - prev_seq_id[antenna_id]) % 256 != 1 ) {
//log_info("RECV_THREAD", "seq_ids are not sequential (%d, %d)", //log_info("RECV_THREAD", "seq_ids are not sequential (%d, %d)",
// prev_seq_id[antenna_id], header->seq_id); // prev_seq_id[antenna_id], seq_id);
update_counter(&lost_rx_counter, (header->seq_id + 256 - prev_seq_id[antenna_id]) % 256); update_counter(&lost_rx_counter[antenna_id], (seq_id + 256 - prev_seq_id[antenna_id]) % 256);
//stop = 1; break; //stop = 1; break;
} }
// Exit if antenna ID is not in range // Exit if antenna ID is not in range
...@@ -542,15 +570,16 @@ static void *recv_thread(void *p) { ...@@ -542,15 +570,16 @@ static void *recv_thread(void *p) {
log_exit("RECV_THREAD", "No more space in %s buffer", log_exit("RECV_THREAD", "No more space in %s buffer",
trxr_rbuf[antenna_id].name); trxr_rbuf[antenna_id].name);
} }
prev_seq_id[antenna_id] = seq_id;
prev_seq_id[antenna_id] = header->seq_id; seq_id_offset[antenna_id] = (seq_id + 256 - tx_seq_id) % 256;
iq_packet = (ecpri_iq_packet*) (rx_buf + j * MAX_PACKET_SIZE); iq_packet = (ecpri_iq_packet*) (rx_buf + j * MAX_PACKET_SIZE);
*((uint16_t *) (trxr_rbuf[antenna_id].buffer)) = ntohs(iq_packet->payload_size); payload_size = ntohs(iq_packet->payload_size);
memcpy(trxr_rbuf[antenna_id].buffer + 2, *((uint16_t *) rbuf_write(&trxr_rbuf[antenna_id])) = payload_size;
memcpy(rbuf_write(&trxr_rbuf[antenna_id]) + 2,
iq_packet->iq_samples, iq_packet->iq_samples,
ntohs(iq_packet->payload_size)); payload_size);
rbuf_increment_write(&trxr_rbuf[antenna_id], trxr_rbuf[antenna_id].block_len); rbuf_increment_write(&trxr_rbuf[antenna_id], trxr_rbuf[antenna_id].block_len);
update_counter(&recv_counter[antenna_id], ntohs(iq_packet->payload_size) / 4); update_counter(&recv_counter[antenna_id], payload_size / 4);
update_counter(&rx_packet_counter, 1); update_counter(&rx_packet_counter, 1);
} }
if(stop) { if(stop) {
...@@ -657,7 +686,7 @@ static void *send_thread(void *p) { ...@@ -657,7 +686,7 @@ static void *send_thread(void *p) {
error(EXIT_FAILURE, errno, "sendmmsg error (returned %d)", ret); error(EXIT_FAILURE, errno, "sendmmsg error (returned %d)", ret);
j += ret; j += ret;
update_counter(&sent_counter, ret * TX_IQ_PAYLOAD / 4); update_counter(&sent_counter, ret * TX_IQ_PAYLOAD / 4);
update_counter(&tx_packet_counter, ret * 4); update_counter(&tx_packet_counter, ret);
} }
} }
pthread_exit(EXIT_SUCCESS); pthread_exit(EXIT_SUCCESS);
...@@ -679,6 +708,9 @@ static void *statistic_thread(void *p) { ...@@ -679,6 +708,9 @@ static void *statistic_thread(void *p) {
if(!stats_file_desc) if(!stats_file_desc)
error(EXIT_FAILURE, errno, "Couldn't open %s\n", stats_file_name); error(EXIT_FAILURE, errno, "Couldn't open %s\n", stats_file_name);
for(int i = 0; i < 10; i++)
fprintf(stats_file_desc, "\n");
// Set thread CPU affinity // Set thread CPU affinity
CPU_ZERO(&mask); CPU_ZERO(&mask);
CPU_SET(s->statistic_affinity, &mask); CPU_SET(s->statistic_affinity, &mask);
...@@ -688,7 +720,7 @@ static void *statistic_thread(void *p) { ...@@ -688,7 +720,7 @@ static void *statistic_thread(void *p) {
clock_gettime(CLOCK_TAI, &initial); clock_gettime(CLOCK_TAI, &initial);
next = initial; next = initial;
for(int64_t i = 0;; i++) { for(int64_t i = 0;; i++) {
add_ns(&next, s->statistics_refresh_rate_ns); add_ns(&next, ((int64_t) s->statistics_refresh_rate_ms) * 1000000L);
print_stats(stats_file_desc, (i % 17) == 0); print_stats(stats_file_desc, (i % 17) == 0);
#ifdef DEBUG #ifdef DEBUG
//print_debug(stats_file_desc, ((i + 9) % 17) == 0); //print_debug(stats_file_desc, ((i + 9) % 17) == 0);
...@@ -697,10 +729,8 @@ static void *statistic_thread(void *p) { ...@@ -697,10 +729,8 @@ static void *statistic_thread(void *p) {
update_counter_freq(&rx_drop_counter); update_counter_freq(&rx_drop_counter);
update_counter_freq(&tx_drop_counter); update_counter_freq(&tx_drop_counter);
update_counter_freq(&recv_counter[0]); for(int i = 0; i < MAX_CHANNELS; i++)
update_counter_freq(&recv_counter[1]); update_counter_freq(&recv_counter[i]);
update_counter_freq(&recv_counter[2]);
update_counter_freq(&recv_counter[3]);
update_counter_freq(&read_counter); update_counter_freq(&read_counter);
update_counter_freq(&write_counter); update_counter_freq(&write_counter);
update_counter_freq(&sent_counter); update_counter_freq(&sent_counter);
...@@ -800,14 +830,15 @@ int start(TRXEcpriState * s) { ...@@ -800,14 +830,15 @@ int start(TRXEcpriState * s) {
//set_latency_target(); //set_latency_target();
for(int i = 0; i < MAX_CHANNELS; i++)
seq_id_offset[i] = 0;
tx_seq_id = 0; tx_seq_id = 0;
init_counter(&rx_drop_counter); init_counter(&rx_drop_counter);
init_counter(&tx_drop_counter); init_counter(&tx_drop_counter);
init_counter(&lost_rx_counter); for(int i = 0; i < MAX_CHANNELS; i++) {
init_counter(&recv_counter[0]); init_counter(&lost_rx_counter[i]);
init_counter(&recv_counter[1]); init_counter(&recv_counter[i]);
init_counter(&recv_counter[2]); }
init_counter(&recv_counter[3]);
init_counter(&read_counter); init_counter(&read_counter);
init_counter(&write_counter); init_counter(&write_counter);
init_counter(&sent_counter); init_counter(&sent_counter);
...@@ -967,12 +998,12 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void ...@@ -967,12 +998,12 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
for(int i = 0; i < s->tx_n_channel; i++) { for(int i = 0; i < s->tx_n_channel; i++) {
if(__samples) if(__samples)
memcpy( memcpy(
((uint8_t *) trxw_rbuf[i].buffer) + trxw_rbuf[0].write_index, rbuf_write(&trxw_rbuf[i]),
((uint8_t *) _samples[i]) + offset, ((uint8_t *) _samples[i]) + offset,
nc); nc);
else else
memset( memset(
((uint8_t *) trxw_rbuf[i].buffer) + trxw_rbuf[0].write_index, rbuf_write(&trxw_rbuf[i]),
0, 0,
nc); nc);
rbuf_increment_write(&trxw_rbuf[i], nc); rbuf_increment_write(&trxw_rbuf[i], nc);
...@@ -1000,16 +1031,23 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa ...@@ -1000,16 +1031,23 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa
float ** _samples = (float **) __samples; float ** _samples = (float **) __samples;
TRXEcpriState *s = s1->opaque; TRXEcpriState *s = s1->opaque;
//log_debug("TRX_ECPRI_READ", "Read %d samples", count);
for(int i = 0; i < s->rx_n_channel; i++ ) { for(int i = 0; i < s->rx_n_channel; i++ ) {
offset = 0; offset = 0;
count_left = count; count_left = count;
while(count_left) { while(count_left) {
ra = rbuf_read_amount(&trxr_rbuf[i]); ra = rbuf_read_amount(&trxr_rbuf[i]) / trxr_rbuf[i].block_len;
if(!ra) //log_debug("TRX_ECPRI_READ", "ra = %ld / %ld = %ld",
// rbuf_read_amount(&trxr_rbuf[i]), trxr_rbuf[i].block_len, ra);
if(!ra) {
usleep(100); usleep(100);
continue;
}
for(int j = 0; j < ra; j++) { for(int j = 0; j < ra; j++) {
data = trxr_rbuf[i].buffer + trxr_rbuf[i].read_index; data = rbuf_read(&trxr_rbuf[i]);
payload_size = *((uint16_t *) data); payload_size = *((uint16_t *) data);
//log_debug("TRX_ECPRI_READ", "payload = %d", payload_size);
if(count_left < payload_size / 4) if(count_left < payload_size / 4)
payload_size = count_left * 4; payload_size = count_left * 4;
memcpy( memcpy(
...@@ -1270,8 +1308,10 @@ int trx_driver_init(TRXState *s1) ...@@ -1270,8 +1308,10 @@ int trx_driver_init(TRXState *s1)
s->rx_n_channel = (int) val; s->rx_n_channel = (int) val;
trx_get_param_double(s1, &val, "tx_n_channel"); trx_get_param_double(s1, &val, "tx_n_channel");
s->tx_n_channel = (int) val; s->tx_n_channel = (int) val;
trx_get_param_double(s1, &val, "statistics_refresh_rate_ns"); trx_get_param_double(s1, &val, "statistics_refresh_rate_ms");
s->statistics_refresh_rate_ns = (int) val; s->statistics_refresh_rate_ms = (int) val;
trx_get_param_double(s1, &val, "rx_drop_pcm");
s->rx_drop_pcm = (int) val;
if(s->rx_n_channel == 0) if(s->rx_n_channel == 0)
log_exit("TRX_ECPRI", "rx_n_channel parameter can't be null\n"); log_exit("TRX_ECPRI", "rx_n_channel parameter can't be null\n");
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment