Commit 4f96c487 authored by Joanne Hugé's avatar Joanne Hugé

WIP

parent 6953fc8e
......@@ -59,6 +59,21 @@ static void init_rbuf(ring_buffer_t * rbuf, char * name, size_t buf_len,
strcpy(rbuf->name, name);
}
static void check_rbuf_read(ring_buffer_t * rbuf) {
if ((rbuf->read_index % block_len))
log_exit("CHECK_RBUF",
"read index needs to be a multiple of block_len"
" (%d is not a multiple of %d)",
rbuf->read_index, block_len);
}
static void check_rbuf_write(ring_buffer_t * rbuf) {
if ((rbuf->write_index % block_len))
log_exit("CHECK_RBUF",
"write index needs to be a multiple of block_len"
" (%d is not a multiple of %d)",
rbuf->write, block_len);
}
static int rbuf_read_amount(ring_buffer_t * rbuf) {
return (rbuf->write_index + rbuf->buf_len - rbuf->read_index) % rbuf->buf_len;
}
......
......@@ -33,16 +33,17 @@
#include "private/trx_driver.h"
#define DEBUG
//#define DISABLE_SEND
//#define DISABLE_RECV
//#define DISABLE_DECODE
#define DISABLE_SEND
//#define DISABLE_ENCODE
#define DISABLE_RECV
#define DISABLE_DECODE
#include "utils.c"
#include "ring_buffer.c"
// Update period for packets per second statistic counter
#define PPS_UPDATE_PERIOD INT64_C(1000000000)
#define STAT_INT_LEN "9"
#define STAT_INT_LEN "10"
#define N_SAMPLES (32)
#define MIN_PACKET_SIZE 64
......@@ -174,6 +175,66 @@ static struct sockaddr_ll connect_sk_addr;
static uint8_t ecpri_iq_header[ECPRI_IQ_HEADER + ORAN_HEADER];
static uint8_t packet_header[PACKET_HEADER]; // ethernet + ecpri + iq header
static void print_debug(FILE * f, int print_header) {
char buffer[200];
if(print_header) {
sprintf(buffer,
" "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"\n",
"RX RA",
"TRXR RA",
"TRXW RA",
"TX RA",
"RX WA",
"TRXR WA",
"TRXW WA",
"TX WA",
"TRXW RI",
"TRXW WI");
for(int i = 0; buffer[i] != '\0'; i++)
if(buffer[i] == ' ')
buffer[i] = '-';
fprintf(f, buffer);
}
sprintf(buffer,
" "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"\n",
rbuf_read_amount(&rx_rbuf) / rx_rbuf.block_len,
rbuf_read_amount(&trxr_rbuf[0]) / trxr_rbuf[0].block_len,
rbuf_read_amount(&trxw_rbuf[0]) / trxw_rbuf[0].block_len,
rbuf_read_amount(&tx_rbuf) / tx_rbuf.block_len,
rbuf_write_amount(&rx_rbuf) / rx_rbuf.block_len,
rbuf_write_amount(&trxr_rbuf[0]) / trxr_rbuf[0].block_len,
rbuf_write_amount(&trxw_rbuf[0]) / trxw_rbuf[0].block_len,
rbuf_write_amount(&tx_rbuf) / tx_rbuf.block_len,
trxw_rbuf[0].read_index / trxw_rbuf[0].block_len,
trxw_rbuf[0].write_index / trxw_rbuf[0].block_len);
for(int i = 0; buffer[i] != '\0'; i++)
if(buffer[i] == ' ')
buffer[i] = '-';
fprintf(f, buffer);
}
static void print_stats(FILE * f, int print_header) {
if(print_header) {
fprintf(f,
......@@ -256,6 +317,7 @@ static void log_exit(const char * section, const char * msg, ...) {
// Dump useful information
print_stats(stderr, 1);
print_debug(stderr, 1);
fprintf(stderr, "TX RBUF: ri %d wi %d ra %d wa %d\n",
tx_rbuf.read_index, tx_rbuf.write_index, rbuf_read_amount(&tx_rbuf),
rbuf_write_amount(&tx_rbuf));
......@@ -322,6 +384,10 @@ static void *recv_thread(void *p) {
memset(msgv, 0, sizeof(msgv));
memset(msgh, 0, sizeof(msgh));
if(rbuf_write_amount(&rx_rbuf) < RX_BURST_SIZE * PACKET_SIZE)
log_error("RECV_THREAD",
"Not enough space in RX buffer (%d < %d)\n",
rbuf_write_amount(&rx_rbuf), RX_BURST_SIZE * PACKET_SIZE);
for(int j = 0; j < RX_BURST_SIZE; j++) {
msgv[j].iov_base = rbuf_write(&rx_rbuf) + j * rx_rbuf.block_len;
msgv[j].iov_len = rx_rbuf.block_len;
......@@ -335,8 +401,9 @@ static void *recv_thread(void *p) {
if(ret != RX_BURST_SIZE)
log_error("RECV_THREAD",
"recvmmsg received %d messages instead of %d\n", ret, RX_BURST_SIZE);
rbuf_increment_write(&rx_rbuf, RX_BURST_SIZE * rx_rbuf.block_len);
rbuf_increment_write(&rx_rbuf, RX_BURST_SIZE * PACKET_SIZE);
update_counter(&recv_counter, RX_BURST_SIZE);
check_rbuf_write(&rx_rbuf);
}
pthread_exit(EXIT_SUCCESS);
}
......@@ -375,14 +442,13 @@ static void *send_thread(void *p) {
for(int64_t i = 1;; i++) {
int to_send = rbuf_read_amount(&tx_rbuf);
int to_send = rbuf_read_amount(&tx_rbuf) / PACKET_SIZE;
if(to_send > TX_BURST_SIZE)
to_send = TX_BURST_SIZE;
for(int j = 0 ; j < to_send ; j++) {
msgv[j].iov_base = rbuf_read(&tx_rbuf);
msgv[j].iov_len = tx_rbuf.block_len;
rbuf_increment_read(&tx_rbuf, tx_rbuf.block_len);
msgv[j].iov_base = rbuf_read(&tx_rbuf) + j;
msgv[j].iov_len = PACKET_SIZE;
if(j > TX_BURST_SIZE)
log_exit("SEND_THREAD", "Too many burst packets");
}
......@@ -394,12 +460,14 @@ static void *send_thread(void *p) {
j += ret;
update_counter(&sent_counter, ret);
}
rbuf_increment_read(&tx_rbuf, PACKET_SIZE * to_send);
check_rbuf_read(&tx_rbuf);
}
pthread_exit(EXIT_SUCCESS);
}
static void *encode_thread(void *p) {
#ifdef DISABLE_SEND
#ifdef DISABLE_ENCODE
pthread_exit(EXIT_SUCCESS);
#endif
......@@ -444,26 +512,15 @@ static void *encode_thread(void *p) {
*word |= slot_id << 4 ;
*word |= symbol_id;
// 8832 bytes of IQ samples
// Add IQ_PAYLOAD to the current packet
int write_count;
int count_left = IQ_PAYLOAD;
while((write_count = rbuf_contiguous_copy(&trxw_rbuf[antenna_id], NULL,
count_left))) {
memcpy(data + PACKET_HEADER + IQ_PAYLOAD - count_left,
rbuf_read(&trxw_rbuf[antenna_id]),
write_count);
rbuf_increment_read(&trxw_rbuf[antenna_id], write_count);
rbuf_increment_write(&tx_rbuf, write_count);
rbuf_increment_read(&trxw_rbuf[0], write_count);
count_left -= write_count;
}
if(count_left)
exit(EXIT_FAILURE);
memcpy(data + PACKET_HEADER,
rbuf_read(&trxw_rbuf[antenna_id]),
IQ_PAYLOAD);
rbuf_increment_read(&trxw_rbuf[antenna_id], IQ_PAYLOAD);
rbuf_increment_write(&tx_rbuf, PACKET_SIZE);
check_rbuf_read(&trxw_rbuf[antenna_id]);
}
check_rbuf_write(&tx_rbuf);
// Increment counter once we wrote packets for all channels
update_counter(&encode_counter, 1);
......@@ -509,18 +566,12 @@ static void *decode_thread(void *p) {
uint16_t antenna_id = ntohs(*((uint16_t *) (data + j + 0)));
uint8_t seq_id = ((uint8_t) (*((uint16_t * ) (data + j + 2)) >> 8));
int write_count;
int count_left = IQ_PAYLOAD;
while((write_count = rbuf_contiguous_copy(NULL,
&trxr_rbuf[antenna_id], IQ_PAYLOAD))) {
memcpy(trxr_rbuf[antenna_id].buffer + (IQ_PAYLOAD - count_left),
data + PACKET_HEADER + IQ_PAYLOAD - count_left,
write_count);
rbuf_increment_write(&trxr_rbuf[antenna_id], write_count);
rbuf_increment_read(&rx_rbuf, write_count);
count_left -= write_count;
}
memcpy(trxr_rbuf[antenna_id].buffer,
data + PACKET_HEADER,
IQ_PAYLOAD);
rbuf_increment_write(&trxr_rbuf[antenna_id], IQ_PAYLOAD);
rbuf_increment_read(&rx_rbuf, PACKET_SIZE);
check_rbuf_write(&trxr_rbuf[antenna_id]);
update_counter(&decode_counter, 1);
}
pthread_exit(EXIT_SUCCESS);
......@@ -553,19 +604,9 @@ static void *statistic_thread(void *p) {
next = initial;
for(int64_t i = 0;; i++) {
add_ns(&next, s->statistics_refresh_rate_ns);
print_stats(stats_file_desc, (i % 50) == 0);
print_stats(stats_file_desc, (i % 17) == 0);
#ifdef DEBUG
fprintf(stats_file_desc,
"%d %d %d %d %d %d %d %d\n",
rx_rbuf.write_index,
rx_rbuf.read_index,
trxr_rbuf[0].write_index,
trxr_rbuf[0].read_index,
trxw_rbuf[0].write_index,
trxw_rbuf[0].read_index,
tx_rbuf.write_index,
tx_rbuf.read_index);
fprintf(stats_file_desc, "TRXW RBUF: ri %d wi %d ra %d wa %d\n", trxw_rbuf[0].read_index, trxw_rbuf[0].write_index, rbuf_read_amount(&trxw_rbuf[0]), rbuf_write_amount(&trxw_rbuf[0]));
print_debug(stats_file_desc, ((i + 9) % 17) == 0);
#endif
fflush(stats_file_desc);
......@@ -723,19 +764,19 @@ int start(TRXEcpriState * s) {
init_counter(&encode_counter);
init_counter(&sent_counter);
init_rbuf(&rx_rbuf, "RX ring buffer", s->txrx_buf_size, PACKET_SIZE);
init_rbuf(&tx_rbuf, "TX ring buffer", s->txrx_buf_size, PACKET_SIZE);
init_rbuf(&rx_rbuf, "RX ring buffer", s->txrx_buf_size, PACKET_SIZE * s->rx_n_channel);
init_rbuf(&tx_rbuf, "TX ring buffer", s->txrx_buf_size, PACKET_SIZE * s->tx_n_channel);
log_debug("DEBUG", "rbuf_read_amount(tx_rbuf) = %d", rbuf_read_amount(&tx_rbuf));
for(int i = 0; i < s->tx_n_channel; i++) {
char name[256];
sprintf(name, "TRXWrite Ring Buffer %d", i);
init_rbuf(&trxw_rbuf[i], name, s->trx_buf_size, sizeof(Complex));
init_rbuf(&trxw_rbuf[i], name, s->trx_buf_size, IQ_PAYLOAD);
}
for(int i = 0; i < s->rx_n_channel; i++) {
char name[256];
sprintf(name, "TRXRead Ring Buffer %d", i);
init_rbuf(&trxr_rbuf[i], name, s->trx_buf_size, sizeof(Complex));
init_rbuf(&trxr_rbuf[i], name, s->trx_buf_size, IQ_PAYLOAD);
}
memset((uint8_t *) packet_header, 0, PACKET_HEADER);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment