Commit 41849189 authored by Joanne Hugé's avatar Joanne Hugé

wip: multi-thread recv

parent 9d1ff60f
......@@ -39,7 +39,7 @@
//#define DISABLE_WRITE
//#define DISABLE_RECV
//#define DISABLE_READ
#define DISABLE_RECV2
//#define DISABLE_RECV2
#define RX_PPS_MODE
......@@ -129,6 +129,13 @@
*/
typedef struct {
int64_t counter;
int64_t freq_counter;
int64_t freq_ts;
int64_t freq;
} counter_stat_t;
typedef struct {
const char * rrh_mac;
const char * bbu_mac;
......@@ -136,6 +143,8 @@ typedef struct {
const char * log_directory;
int recv_affinity;
int recv2_affinity;
int recv3_affinity;
int recv4_affinity;
int send_affinity;
int statistic_affinity;
......@@ -152,15 +161,11 @@ typedef struct {
int statistics_refresh_rate_ms;
int sample_rate;
int rx_drop_pcm;
uint8_t * rx_buf;
ring_buffer_t trxr_rbuf[MAX_CHANNELS];
counter_stat_t * rx_packet_counter; // packets received from RRH
} TRXEcpriState;
typedef struct {
int64_t counter;
int64_t freq_counter;
int64_t freq_ts;
int64_t freq;
} counter_stat_t;
typedef struct {
uint8_t mac_dst[6];
uint8_t mac_src[6];
......@@ -203,11 +208,21 @@ typedef struct {
// Buffers
static ring_buffer_t trxr_rbuf[MAX_CHANNELS]; // Decoded IQ samples
static ring_buffer_t trxr2_rbuf[MAX_CHANNELS]; // Decoded IQ samples
static ring_buffer_t trxr3_rbuf[MAX_CHANNELS]; // Decoded IQ samples
static ring_buffer_t trxr4_rbuf[MAX_CHANNELS]; // Decoded IQ samples
static ring_buffer_t trxw_rbuf[MAX_CHANNELS]; // Uncompressed IQ samples
static uint8_t rx_buf[MAX_PACKET_SIZE * MAX_RX_BURST];
static uint8_t rx2_buf[MAX_PACKET_SIZE * MAX_RX_BURST];
static uint8_t rx3_buf[MAX_PACKET_SIZE * MAX_RX_BURST];
static uint8_t rx4_buf[MAX_PACKET_SIZE * MAX_RX_BURST];
static uint8_t tx_buf[MAX_PACKET_SIZE * MAX_TX_BURST];
static TRXEcpriState s_recv;
static TRXEcpriState s_recv2;
static TRXEcpriState s_recv3;
static TRXEcpriState s_recv4;
// Counters
static counter_stat_t recv_counter[MAX_CHANNELS]; // IQs received from RRH
static counter_stat_t read_counter; // IQs passed to amarisoft stack
......@@ -218,6 +233,8 @@ static counter_stat_t tx_drop_counter; // tx packets dropped by driver
static counter_stat_t lost_rx_counter[MAX_CHANNELS]; // gaps between seq_id's
static counter_stat_t rx_packet_counter; // packets received from RRH
static counter_stat_t rx2_packet_counter; // packets received from RRH
static counter_stat_t rx3_packet_counter; // packets received from RRH
static counter_stat_t rx4_packet_counter; // packets received from RRH
static counter_stat_t tx_packet_counter; // packets sent from TRX
// Network
......@@ -377,9 +394,9 @@ static void print_stats(FILE * f, int print_header) {
int64_to_6s(s + 7 * offset++, write_counter.freq);
int64_to_6s(s + 7 * offset++, sent_counter.freq);
int64_to_6s(s + 7 * offset++, rx_packet_counter.counter);
int64_to_6s(s + 7 * offset++, rx2_packet_counter.counter);
int64_to_6s(s + 7 * offset++, rx2_packet_counter.counter + rx3_packet_counter.counter + rx4_packet_counter.counter);
int64_to_6s(s + 7 * offset++, tx_packet_counter.counter);
int64_to_6s(s + 7 * offset++, rx_packet_counter.freq + rx2_packet_counter.freq);
int64_to_6s(s + 7 * offset++, rx_packet_counter.freq + rx2_packet_counter.freq + rx3_packet_counter.freq + rx4_packet_counter.freq);
int64_to_6s(s + 7 * offset++, tx_packet_counter.freq);
int64_to_6s(s + 7 * offset++, rbuf_write_amount(&trxr_rbuf[0]) / 4);
int64_to_6s(s + 7 * offset++, rbuf_write_amount(&trxr_rbuf[1]) / 4);
......@@ -520,7 +537,7 @@ static void *recv_thread(void *p) {
memset(msgv, 0, sizeof(msgv));
memset(msgh, 0, sizeof(msgh));
for(int j = 0; j < s->rx_burst; j++) {
msgv[j].iov_base = rx_buf + j * MAX_PACKET_SIZE;
msgv[j].iov_base = s->rx_buf + j * MAX_PACKET_SIZE;
msgv[j].iov_len = MAX_PACKET_SIZE;
msgh[j].msg_hdr.msg_iov = &msgv[j];
msgh[j].msg_hdr.msg_iovlen = 1;
......@@ -531,14 +548,14 @@ static void *recv_thread(void *p) {
if(ret <= -1)
error(EXIT_FAILURE, errno, "recvmmsg error");
#ifdef RX_PPS_MODE
update_counter(&rx_packet_counter, ret);
update_counter(s->rx_packet_counter, ret);
continue;
#endif
if((i % 100000) < s->rx_drop_pcm)
continue;
// Process each received message
for(int j = 0; j < ret; j++) {
header = (ecpri_header*) (rx_buf + j * MAX_PACKET_SIZE);
header = (ecpri_header*) (s->rx_buf + j * MAX_PACKET_SIZE);
// Discard packet if it is not eCPRI
if(header->ether_type != 0xfeae)
......@@ -551,7 +568,7 @@ static void *recv_thread(void *p) {
}
// If packet is a timing packet
if(header->ecpri_type == 255) {
timing_packet = (ecpri_timing_packet*) (rx_buf + j * MAX_PACKET_SIZE);
timing_packet = (ecpri_timing_packet*) (s->rx_buf + j * MAX_PACKET_SIZE);
log_info("RECV_THREAD", "GPS TIME: %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
timing_packet->gps_time[0],
timing_packet->gps_time[1],
......@@ -585,27 +602,27 @@ static void *recv_thread(void *p) {
stop = 1; break;
}
// Exit if there is no more space in the buffer
if(rbuf_write_amount(&trxr_rbuf[antenna_id]) <= trxr_rbuf[antenna_id].block_len) {
//rbuf_increment_read(&trxr_rbuf[antenna_id], trxr_rbuf[antenna_id].block_len);
if(rbuf_write_amount(&s->trxr_rbuf[antenna_id]) <= s->trxr_rbuf[antenna_id].block_len) {
//rbuf_increment_read(s->trxr_rbuf[antenna_id], s->trxr_rbuf[antenna_id].block_len);
//update_counter(&rx_drop_counter, 1);
log_exit("RECV_THREAD", "No more space in %s buffer",
trxr_rbuf[antenna_id].name);
s->trxr_rbuf[antenna_id].name);
}
prev_seq_id[antenna_id] = seq_id;
seq_id_offset[antenna_id] = (seq_id + 256 - tx_seq_id) % 256;
iq_packet = (ecpri_iq_packet*) (rx_buf + j * MAX_PACKET_SIZE);
iq_packet = (ecpri_iq_packet*) (s->rx_buf + j * MAX_PACKET_SIZE);
payload_size = ntohs(iq_packet->payload_size);
*((uint16_t *) rbuf_write(&trxr_rbuf[antenna_id])) = payload_size;
memcpy(rbuf_write(&trxr_rbuf[antenna_id]) + 2,
*((uint16_t *) rbuf_write(&s->trxr_rbuf[antenna_id])) = payload_size;
memcpy(rbuf_write(&s->trxr_rbuf[antenna_id]) + 2,
iq_packet->iq_samples,
payload_size);
rbuf_increment_write(&trxr_rbuf[antenna_id], trxr_rbuf[antenna_id].block_len);
rbuf_increment_write(&s->trxr_rbuf[antenna_id], s->trxr_rbuf[antenna_id].block_len);
update_counter(&recv_counter[antenna_id], payload_size / 4);
update_counter(&rx_packet_counter, 1);
update_counter(s->rx_packet_counter, 1);
}
if(stop) {
for(int j = 0; j < ret; j++) {
header = (ecpri_header*) (rx_buf + j * MAX_PACKET_SIZE);
header = (ecpri_header*) (s->rx_buf + j * MAX_PACKET_SIZE);
print_packet(header);
}
log_exit("RECV_THREAD", "Exiting");
......@@ -614,45 +631,6 @@ static void *recv_thread(void *p) {
pthread_exit(EXIT_SUCCESS);
}
static void *recv2_thread(void *p) {
#ifdef DISABLE_RECV
pthread_exit(EXIT_SUCCESS);
#endif
cpu_set_t mask;
struct mmsghdr msgh[MAX_RX_BURST];
struct iovec msgv[MAX_RX_BURST];
TRXEcpriState * s = (TRXEcpriState *) p;
log_info("RECV_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->recv2_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno,
"Could not set CPU affinity to CPU %d\n", s->recv2_affinity);
for(int64_t i = 0;; i++) {
// Reset data structures for recv messages
memset(msgv, 0, sizeof(msgv));
memset(msgh, 0, sizeof(msgh));
for(int j = 0; j < s->rx_burst; j++) {
msgv[j].iov_base = rx2_buf + j * MAX_PACKET_SIZE;
msgv[j].iov_len = MAX_PACKET_SIZE;
msgh[j].msg_hdr.msg_iov = &msgv[j];
msgh[j].msg_hdr.msg_iovlen = 1;
}
// Receive at most rx_burst messages
int ret = recvmmsg(recv_sockfd, msgh, s->rx_burst, 0, NULL);
if(ret <= -1)
error(EXIT_FAILURE, errno, "recvmmsg error");
update_counter(&rx2_packet_counter, ret);
}
pthread_exit(EXIT_SUCCESS);
}
// Send as soon as packets are encoded
static void *send_thread(void *p) {
#ifdef DISABLE_SEND
......@@ -796,6 +774,8 @@ static void *statistic_thread(void *p) {
update_counter_freq(&sent_counter);
update_counter_freq(&rx_packet_counter);
update_counter_freq(&rx2_packet_counter);
update_counter_freq(&rx3_packet_counter);
update_counter_freq(&rx4_packet_counter);
update_counter_freq(&tx_packet_counter);
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
......@@ -806,17 +786,42 @@ static void *statistic_thread(void *p) {
static int start_threads(TRXEcpriState * s) {
pthread_t recv_pthread;
pthread_t recv2_pthread;
pthread_t recv3_pthread;
pthread_t recv4_pthread;
pthread_t send_pthread;
pthread_t statistic_pthread;
struct sched_param recv_param;
struct sched_param recv2_param;
struct sched_param recv3_param;
struct sched_param recv4_param;
struct sched_param send_param;
struct sched_param statistic_param;
pthread_attr_t recv_attr;
pthread_attr_t recv2_attr;
pthread_attr_t recv3_attr;
pthread_attr_t recv4_attr;
pthread_attr_t send_attr;
pthread_attr_t statistic_attr;
memcpy((uint8_t*) &s_recv, s, sizeof(TRXEcpriState));
memcpy((uint8_t*) &s_recv2, s, sizeof(TRXEcpriState));
memcpy((uint8_t*) &s_recv3, s, sizeof(TRXEcpriState));
memcpy((uint8_t*) &s_recv4, s, sizeof(TRXEcpriState));
s_recv.rx_buf = rx_buf;
s_recv2.rx_buf = rx2_buf;
s_recv3.rx_buf = rx3_buf;
s_recv4.rx_buf = rx4_buf;
for(int i = 0; i < MAX_CHANNELS; i++) {
s_recv.trxr_rbuf[i] = trxr_rbuf[i];
s_recv2.trxr_rbuf[i] = trxr2_rbuf[i];
s_recv3.trxr_rbuf[i] = trxr3_rbuf[i];
s_recv4.trxr_rbuf[i] = trxr4_rbuf[i];
}
s_recv.rx_packet_counter = &rx_packet_counter;
s_recv2.rx_packet_counter = &rx2_packet_counter;
s_recv3.rx_packet_counter = &rx3_packet_counter;
s_recv4.rx_packet_counter = &rx4_packet_counter;
log_info("TRX_ECPRI", "Starting threads");
// Initialize pthread attributes (default values)
......@@ -846,6 +851,28 @@ static int start_threads(TRXEcpriState * s) {
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&recv2_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&recv3_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&recv3_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&recv3_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
recv3_param.sched_priority = 97;
if (pthread_attr_setschedparam(&recv3_attr, &recv3_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&recv3_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&recv4_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&recv4_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&recv4_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
recv4_param.sched_priority = 97;
if (pthread_attr_setschedparam(&recv4_attr, &recv4_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&recv4_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&send_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
......@@ -881,8 +908,14 @@ static int start_threads(TRXEcpriState * s) {
error(EXIT_FAILURE, errno, "Couldn't create recv thread");
usleep(1000 * 500);
#ifndef DISABLE_RECV2
if (pthread_create(&recv2_pthread, NULL, recv2_thread, s))
if (pthread_create(&recv2_pthread, NULL, recv_thread, &s_recv2))
error(EXIT_FAILURE, errno, "Couldn't create recv2 thread");
usleep(1000 * 500);
if (pthread_create(&recv3_pthread, NULL, recv_thread, &s_recv3))
error(EXIT_FAILURE, errno, "Couldn't create recv3 thread");
usleep(1000 * 500);
if (pthread_create(&recv4_pthread, NULL, recv_thread, &s_recv4))
error(EXIT_FAILURE, errno, "Couldn't create recv4 thread");
#endif
return 0;
......@@ -925,6 +958,8 @@ int start(TRXEcpriState * s) {
init_counter(&sent_counter);
init_counter(&rx_packet_counter);
init_counter(&rx2_packet_counter);
init_counter(&rx3_packet_counter);
init_counter(&rx4_packet_counter);
init_counter(&tx_packet_counter);
for(int i = 0; i < s->tx_n_channel; i++) {
......@@ -1101,6 +1136,37 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
update_counter(&write_counter, (s->tx_n_channel * count));
}
static int read_trxr(ring_buffer_t * rbuf, float * samples, int count_left, int offset) {
uint8_t * data;
int payload_size;
int leftover_payload;
int ra = rbuf_read_amount(rbuf) / rbuf->block_len;
int read = 0;
//log_debug("TRX_ECPRI_READ", "ra = %ld / %ld = %ld",
// rbuf_read_amount(rbuf), rbuf->block_len, ra);
for(int j = 0; j < ra; j++) {
data = rbuf_read(rbuf);
payload_size = *((uint16_t *) data);
//log_debug("TRX_ECPRI_READ", "payload = %d", payload_size);
leftover_payload = payload_size - (count_left * 4);
if(leftover_payload > 0)
payload_size = count_left * 4;
memcpy(((uint8_t*) samples) + offset, data + 2, payload_size);
read += payload_size / 4;
if(leftover_payload > 0) {
// Shift
*((uint16_t *) data) = leftover_payload;
for(int k = 0; k < leftover_payload; k++)
data[2 + k] = data[2 + payload_size + k];
}
count_left -= payload_size / 4;
if(!count_left)
break;
offset += payload_size;
}
return read;
}
/*
Callback for sending TRX samples to Amarisoft
Put count IQ samples from TRX read buffer in __samples, return as soon as possible
......@@ -1112,9 +1178,7 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__samples,
int count, int rx_port_index, TRXReadMetadata *md)
{
int64_t count_left, offset, ra;
uint8_t * data;
uint16_t payload_size;
int64_t count_left, offset, read;
float ** _samples = (float **) __samples;
TRXEcpriState *s = s1->opaque;
......@@ -1129,29 +1193,19 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa
offset = 0;
count_left = count;
while(count_left) {
ra = rbuf_read_amount(&trxr_rbuf[i]) / trxr_rbuf[i].block_len;
//log_debug("TRX_ECPRI_READ", "ra = %ld / %ld = %ld",
// rbuf_read_amount(&trxr_rbuf[i]), trxr_rbuf[i].block_len, ra);
if(!ra) {
usleep(100);
read = read_trxr(&trxr_rbuf[i], _samples[i], count_left, offset);
if(!read)
read = read_trxr(&trxr2_rbuf[i], _samples[i], count_left, offset);
if(!read)
read = read_trxr(&trxr3_rbuf[i], _samples[i], count_left, offset);
if(!read)
read = read_trxr(&trxr4_rbuf[i], _samples[i], count_left, offset);
if(!read) {
usleep(10);
continue;
}
for(int j = 0; j < ra; j++) {
data = rbuf_read(&trxr_rbuf[i]);
payload_size = *((uint16_t *) data);
//log_debug("TRX_ECPRI_READ", "payload = %d", payload_size);
if(count_left < payload_size / 4)
payload_size = count_left * 4;
memcpy(
((uint8_t*) _samples[i]) + offset,
data + 2,
payload_size);
count_left -= payload_size / 4;
if(!count_left)
break;
offset += payload_size;
rbuf_increment_read(&trxr_rbuf[i], trxr_rbuf[i].block_len);
}
count_left -= read;
offset += read * 4;
}
}
*ptimestamp = read_counter.counter / s->rx_n_channel;
......@@ -1382,6 +1436,10 @@ int trx_driver_init(TRXState *s1)
s->recv_affinity = (int) val;
trx_get_param_double(s1, &val, "recv2_affinity");
s->recv2_affinity = (int) val;
trx_get_param_double(s1, &val, "recv3_affinity");
s->recv3_affinity = (int) val;
trx_get_param_double(s1, &val, "recv4_affinity");
s->recv4_affinity = (int) val;
trx_get_param_double(s1, &val, "send_affinity");
s->send_affinity = (int) val;
trx_get_param_double(s1, &val, "statistic_affinity");
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment