Commit bb0042ad authored by Joanne Hugé's avatar Joanne Hugé

wip: try SISO

parent 41849189
......@@ -39,9 +39,9 @@
//#define DISABLE_WRITE
//#define DISABLE_RECV
//#define DISABLE_READ
//#define DISABLE_RECV2
#define DISABLE_RECV2
#define RX_PPS_MODE
//#define RX_PPS_MODE
#ifdef RX_PPS_MODE
#define DISABLE_SEND
......@@ -162,7 +162,7 @@ typedef struct {
int sample_rate;
int rx_drop_pcm;
uint8_t * rx_buf;
ring_buffer_t trxr_rbuf[MAX_CHANNELS];
ring_buffer_t * trxr_rbuf[MAX_CHANNELS];
counter_stat_t * rx_packet_counter; // packets received from RRH
} TRXEcpriState;
......@@ -518,12 +518,14 @@ static void *recv_thread(void *p) {
uint8_t seq_id;
struct mmsghdr msgh[MAX_RX_BURST];
struct iovec msgv[MAX_RX_BURST];
struct timespec t1;
struct timespec t2;
ecpri_header * header;
ecpri_iq_packet * iq_packet;
ecpri_timing_packet * timing_packet;
TRXEcpriState * s = (TRXEcpriState *) p;
log_info("RECV_THREAD", "Thread init");
log_info("RECV_THREAD", "Thread init (Affinity %d)", s->recv_affinity);
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->recv_affinity, &mask);
......@@ -544,11 +546,21 @@ static void *recv_thread(void *p) {
}
// Receive at most rx_burst messages
#ifdef RX_PPS_MODE
clock_gettime(CLOCK_TAI, &t1);
#endif
int ret = recvmmsg(recv_sockfd, msgh, s->rx_burst, 0, NULL);
if(ret <= -1)
error(EXIT_FAILURE, errno, "recvmmsg error");
#ifdef RX_PPS_MODE
update_counter(s->rx_packet_counter, ret);
if(ret) {
clock_gettime(CLOCK_TAI, &t2);
if(calcdiff_ns(t2, t1) > 10 * 1000000)
update_counter(&write_counter, 1);
else
update_counter(&sent_counter, 1);
update_counter(s->rx_packet_counter, ret);
}
continue;
#endif
if((i % 100000) < s->rx_drop_pcm)
......@@ -602,21 +614,23 @@ static void *recv_thread(void *p) {
stop = 1; break;
}
// Exit if there is no more space in the buffer
if(rbuf_write_amount(&s->trxr_rbuf[antenna_id]) <= s->trxr_rbuf[antenna_id].block_len) {
//rbuf_increment_read(s->trxr_rbuf[antenna_id], s->trxr_rbuf[antenna_id].block_len);
if(rbuf_write_amount(s->trxr_rbuf[antenna_id]) <= s->trxr_rbuf[antenna_id]->block_len) {
//rbuf_increment_read(s->trxr_rbuf[antenna_id], s->trxr_rbuf[antenna_id]->block_len);
//update_counter(&rx_drop_counter, 1);
log_exit("RECV_THREAD", "No more space in %s buffer",
s->trxr_rbuf[antenna_id].name);
log_exit("RECV_THREAD", "No more space in %s buffer (%ld < %ld)",
s->trxr_rbuf[antenna_id]->name,
rbuf_write_amount(s->trxr_rbuf[antenna_id]),
s->trxr_rbuf[antenna_id]->block_len);
}
prev_seq_id[antenna_id] = seq_id;
seq_id_offset[antenna_id] = (seq_id + 256 - tx_seq_id) % 256;
iq_packet = (ecpri_iq_packet*) (s->rx_buf + j * MAX_PACKET_SIZE);
payload_size = ntohs(iq_packet->payload_size);
*((uint16_t *) rbuf_write(&s->trxr_rbuf[antenna_id])) = payload_size;
memcpy(rbuf_write(&s->trxr_rbuf[antenna_id]) + 2,
*((uint16_t *) rbuf_write(s->trxr_rbuf[antenna_id])) = payload_size;
memcpy(rbuf_write(s->trxr_rbuf[antenna_id]) + 2,
iq_packet->iq_samples,
payload_size);
rbuf_increment_write(&s->trxr_rbuf[antenna_id], s->trxr_rbuf[antenna_id].block_len);
rbuf_increment_write(s->trxr_rbuf[antenna_id], s->trxr_rbuf[antenna_id]->block_len);
update_counter(&recv_counter[antenna_id], payload_size / 4);
update_counter(s->rx_packet_counter, 1);
}
......@@ -807,15 +821,19 @@ static int start_threads(TRXEcpriState * s) {
memcpy((uint8_t*) &s_recv2, s, sizeof(TRXEcpriState));
memcpy((uint8_t*) &s_recv3, s, sizeof(TRXEcpriState));
memcpy((uint8_t*) &s_recv4, s, sizeof(TRXEcpriState));
s_recv.recv_affinity = s->recv_affinity;
s_recv2.recv_affinity = s->recv2_affinity;
s_recv3.recv_affinity = s->recv3_affinity;
s_recv4.recv_affinity = s->recv4_affinity;
s_recv.rx_buf = rx_buf;
s_recv2.rx_buf = rx2_buf;
s_recv3.rx_buf = rx3_buf;
s_recv4.rx_buf = rx4_buf;
for(int i = 0; i < MAX_CHANNELS; i++) {
s_recv.trxr_rbuf[i] = trxr_rbuf[i];
s_recv2.trxr_rbuf[i] = trxr2_rbuf[i];
s_recv3.trxr_rbuf[i] = trxr3_rbuf[i];
s_recv4.trxr_rbuf[i] = trxr4_rbuf[i];
s_recv.trxr_rbuf[i] = &trxr_rbuf[i];
s_recv2.trxr_rbuf[i] = &trxr2_rbuf[i];
s_recv3.trxr_rbuf[i] = &trxr3_rbuf[i];
s_recv4.trxr_rbuf[i] = &trxr4_rbuf[i];
}
s_recv.rx_packet_counter = &rx_packet_counter;
s_recv2.rx_packet_counter = &rx2_packet_counter;
......@@ -904,7 +922,7 @@ static int start_threads(TRXEcpriState * s) {
if (pthread_create(&send_pthread, NULL, send_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create send thread");
usleep(1000 * 500);
if (pthread_create(&recv_pthread, NULL, recv_thread, s))
if (pthread_create(&recv_pthread, NULL, recv_thread, &s_recv))
error(EXIT_FAILURE, errno, "Couldn't create recv thread");
usleep(1000 * 500);
#ifndef DISABLE_RECV2
......@@ -971,6 +989,14 @@ int start(TRXEcpriState * s) {
char name[256];
sprintf(name, "TRXRead Ring Buffer %d", i);
init_rbuf(&trxr_rbuf[i], name, s->trxr_buf_size, MAX_IQ_PAYLOAD);
#ifndef DISABLE_RECV2
sprintf(name, "TRXRead2 Ring Buffer %d", i);
init_rbuf(&trxr2_rbuf[i], name, s->trxr_buf_size, MAX_IQ_PAYLOAD);
sprintf(name, "TRXRead3 Ring Buffer %d", i);
init_rbuf(&trxr3_rbuf[i], name, s->trxr_buf_size, MAX_IQ_PAYLOAD);
sprintf(name, "TRXRead4 Ring Buffer %d", i);
init_rbuf(&trxr4_rbuf[i], name, s->trxr_buf_size, MAX_IQ_PAYLOAD);
#endif
}
if (!(if_index = if_nametoindex(s->bbu_if))) {
......@@ -1158,6 +1184,8 @@ static int read_trxr(ring_buffer_t * rbuf, float * samples, int count_left, int
*((uint16_t *) data) = leftover_payload;
for(int k = 0; k < leftover_payload; k++)
data[2 + k] = data[2 + payload_size + k];
} else {
rbuf_increment_read(rbuf, rbuf->block_len);
}
count_left -= payload_size / 4;
if(!count_left)
......@@ -1194,12 +1222,14 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa
count_left = count;
while(count_left) {
read = read_trxr(&trxr_rbuf[i], _samples[i], count_left, offset);
#ifndef DISABLE_RECV2
if(!read)
read = read_trxr(&trxr2_rbuf[i], _samples[i], count_left, offset);
if(!read)
read = read_trxr(&trxr3_rbuf[i], _samples[i], count_left, offset);
if(!read)
read = read_trxr(&trxr4_rbuf[i], _samples[i], count_left, offset);
#endif
if(!read) {
usleep(10);
continue;
......
......@@ -150,6 +150,9 @@ static struct timespec timestamp_list[50];
static int timestamp_index = 0;
void timer(const char * title) {
if(timestamp_index >= 49) {
timestamp_index = 0;
}
clock_gettime(CLOCK_TAI, &timestamp_list[timestamp_index++]);
if(timestamp_index > 1) {
printf("%20s (%3d) - %ld ns\n", title,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment