Commit 9d1ff60f authored by Joanne Hugé's avatar Joanne Hugé

recv2

parent 29932644
......@@ -39,6 +39,7 @@
//#define DISABLE_WRITE
//#define DISABLE_RECV
//#define DISABLE_READ
#define DISABLE_RECV2
#define RX_PPS_MODE
......@@ -134,6 +135,7 @@ typedef struct {
const char * bbu_if;
const char * log_directory;
int recv_affinity;
int recv2_affinity;
int send_affinity;
int statistic_affinity;
......@@ -203,6 +205,7 @@ typedef struct {
static ring_buffer_t trxr_rbuf[MAX_CHANNELS]; // Decoded IQ samples
static ring_buffer_t trxw_rbuf[MAX_CHANNELS]; // Uncompressed IQ samples
static uint8_t rx_buf[MAX_PACKET_SIZE * MAX_RX_BURST];
static uint8_t rx2_buf[MAX_PACKET_SIZE * MAX_RX_BURST];
static uint8_t tx_buf[MAX_PACKET_SIZE * MAX_TX_BURST];
// Counters
......@@ -214,6 +217,7 @@ static counter_stat_t rx_drop_counter; // rx packets dropped by driver
static counter_stat_t tx_drop_counter; // tx packets dropped by driver
static counter_stat_t lost_rx_counter[MAX_CHANNELS]; // gaps between seq_id's
static counter_stat_t rx_packet_counter; // packets received from RRH
static counter_stat_t rx2_packet_counter; // packets received from RRH
static counter_stat_t tx_packet_counter; // packets sent from TRX
// Network
......@@ -322,7 +326,7 @@ static void print_stats(FILE * f, int print_header) {
int offset = 0;
if(print_header) {
fprintf(f,
"%6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s\n",
"%6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s\n",
"rxdrop",
"txdrop",
"lstrx0",
......@@ -341,6 +345,7 @@ static void print_stats(FILE * f, int print_header) {
"wrte f",
"sent f",
"rx pkt",
"rx2pkt",
"tx pkt",
"rx pps",
"tx pps",
......@@ -372,8 +377,9 @@ static void print_stats(FILE * f, int print_header) {
int64_to_6s(s + 7 * offset++, write_counter.freq);
int64_to_6s(s + 7 * offset++, sent_counter.freq);
int64_to_6s(s + 7 * offset++, rx_packet_counter.counter);
int64_to_6s(s + 7 * offset++, rx2_packet_counter.counter);
int64_to_6s(s + 7 * offset++, tx_packet_counter.counter);
int64_to_6s(s + 7 * offset++, rx_packet_counter.freq);
int64_to_6s(s + 7 * offset++, rx_packet_counter.freq + rx2_packet_counter.freq);
int64_to_6s(s + 7 * offset++, tx_packet_counter.freq);
int64_to_6s(s + 7 * offset++, rbuf_write_amount(&trxr_rbuf[0]) / 4);
int64_to_6s(s + 7 * offset++, rbuf_write_amount(&trxr_rbuf[1]) / 4);
......@@ -382,7 +388,7 @@ static void print_stats(FILE * f, int print_header) {
int64_to_6s(s + 7 * offset++, rbuf_write_amount(&trxw_rbuf[0]) / 4);
offset = 0;
fprintf(f,
"%6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s\n",
"%6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s %6s\n",
s + 7 * 0,
s + 7 * 1,
s + 7 * 2,
......@@ -408,7 +414,8 @@ static void print_stats(FILE * f, int print_header) {
s + 7 * 22,
s + 7 * 23,
s + 7 * 24,
s + 7 * 25);
s + 7 * 25,
s + 7 * 26);
}
static void log_exit(const char * section, const char * msg, ...) {
......@@ -607,6 +614,45 @@ static void *recv_thread(void *p) {
pthread_exit(EXIT_SUCCESS);
}
static void *recv2_thread(void *p) {
#ifdef DISABLE_RECV
pthread_exit(EXIT_SUCCESS);
#endif
cpu_set_t mask;
struct mmsghdr msgh[MAX_RX_BURST];
struct iovec msgv[MAX_RX_BURST];
TRXEcpriState * s = (TRXEcpriState *) p;
log_info("RECV_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->recv2_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno,
"Could not set CPU affinity to CPU %d\n", s->recv2_affinity);
for(int64_t i = 0;; i++) {
// Reset data structures for recv messages
memset(msgv, 0, sizeof(msgv));
memset(msgh, 0, sizeof(msgh));
for(int j = 0; j < s->rx_burst; j++) {
msgv[j].iov_base = rx2_buf + j * MAX_PACKET_SIZE;
msgv[j].iov_len = MAX_PACKET_SIZE;
msgh[j].msg_hdr.msg_iov = &msgv[j];
msgh[j].msg_hdr.msg_iovlen = 1;
}
// Receive at most rx_burst messages
int ret = recvmmsg(recv_sockfd, msgh, s->rx_burst, 0, NULL);
if(ret <= -1)
error(EXIT_FAILURE, errno, "recvmmsg error");
update_counter(&rx2_packet_counter, ret);
}
pthread_exit(EXIT_SUCCESS);
}
// Send as soon as packets are encoded
static void *send_thread(void *p) {
#ifdef DISABLE_SEND
......@@ -749,6 +795,7 @@ static void *statistic_thread(void *p) {
update_counter_freq(&write_counter);
update_counter_freq(&sent_counter);
update_counter_freq(&rx_packet_counter);
update_counter_freq(&rx2_packet_counter);
update_counter_freq(&tx_packet_counter);
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
......@@ -758,12 +805,15 @@ static void *statistic_thread(void *p) {
static int start_threads(TRXEcpriState * s) {
pthread_t recv_pthread;
pthread_t recv2_pthread;
pthread_t send_pthread;
pthread_t statistic_pthread;
struct sched_param recv_param;
struct sched_param recv2_param;
struct sched_param send_param;
struct sched_param statistic_param;
pthread_attr_t recv_attr;
pthread_attr_t recv2_attr;
pthread_attr_t send_attr;
pthread_attr_t statistic_attr;
......@@ -785,6 +835,18 @@ static int start_threads(TRXEcpriState * s) {
if (pthread_attr_setinheritsched(&recv_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&recv2_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&recv2_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&recv2_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
recv2_param.sched_priority = 97;
if (pthread_attr_setschedparam(&recv2_attr, &recv2_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&recv2_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&send_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&send_attr, PTHREAD_STACK_MIN))
......@@ -817,6 +879,11 @@ static int start_threads(TRXEcpriState * s) {
usleep(1000 * 500);
if (pthread_create(&recv_pthread, NULL, recv_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create recv thread");
usleep(1000 * 500);
#ifndef DISABLE_RECV2
if (pthread_create(&recv2_pthread, NULL, recv2_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create recv2 thread");
#endif
return 0;
}
......@@ -857,6 +924,7 @@ int start(TRXEcpriState * s) {
init_counter(&write_counter);
init_counter(&sent_counter);
init_counter(&rx_packet_counter);
init_counter(&rx2_packet_counter);
init_counter(&tx_packet_counter);
for(int i = 0; i < s->tx_n_channel; i++) {
......@@ -1312,6 +1380,8 @@ int trx_driver_init(TRXState *s1)
trx_get_param_double(s1, &val, "recv_affinity");
s->recv_affinity = (int) val;
trx_get_param_double(s1, &val, "recv2_affinity");
s->recv2_affinity = (int) val;
trx_get_param_double(s1, &val, "send_affinity");
s->send_affinity = (int) val;
trx_get_param_double(s1, &val, "statistic_affinity");
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment