Commit d4d210f2 authored by Joanne Hugé's avatar Joanne Hugé

wip

parent 1a4152ac
......@@ -10,7 +10,7 @@ CFLAGS+=-MMD -g
CXXFLAGS=$(CFLAGS)
# binary name
APP = trx_ecpri_dpdk
APP = trx_ecpri
LIB = lib$(APP).so
# all source are stored in SRCS-y
......@@ -18,15 +18,7 @@ SRCS-y := $(APP).c
PKGCONF ?= pkg-config
# Build using pkg-config variables if possible
ifneq ($(shell $(PKGCONF) --exists libdpdk && echo 0),0)
$(error "no installation of DPDK found")
endif
PC_FILE := $(shell $(PKGCONF) --path libdpdk 2>/dev/null)
CFLAGS += -O3 $(shell $(PKGCONF) --cflags libdpdk)
LDFLAGS_SHARED = $(shell $(PKGCONF) --libs libdpdk)
LDFLAGS_STATIC = $(shell $(PKGCONF) --static --libs libdpdk)
LDFLAGS_STATIC = $(shell $(PKGCONF) --static)
LDFLAGS = -l:libpthread.a -l:libm.a
all: $(LIB)
......
......@@ -36,22 +36,22 @@
#endif
typedef struct {
volatile char * buffer;
uint8_t * buffer;
char name[64];
size_t buf_len;
size_t block_len;
volatile int write_index;
volatile int read_index;
int write_index;
int read_index;
} ring_buffer_t;
static void init_rbuf(ring_buffer_t * rbuf, char * name, size_t buf_len,
size_t block_len) {
//log_debug("TRX_ECPRI", "Allocating %s with %d bytes\n", _name, len);
if (!(buf_len % block_len))
log_exit("buf_len needs to be a multiple of block_len");
log_error("INIT_RBUF", "buf_len needs to be a multiple of block_len");
rbuf->buf_len = buf_len;
rbuf->block_len = block_len;
rbuf->buffer = (char *) malloc(buf_len);
rbuf->buffer = (uint8_t *) malloc(buf_len);
rbuf->write_index = 0;
rbuf->read_index = 0;
strcpy(rbuf->name, name);
......@@ -65,22 +65,20 @@ static int rbuf_write_amount(ring_buffer_t * rbuf) {
// That we way we don't have to use locks
return ((rbuf->read_index + rbuf->buf_len - rbuf->write_index - 1) % rbuf->buf_len);
}
static volatile char * rbuf_write(ring_buffer_t * rbuf) {
static uint8_t * rbuf_write(ring_buffer_t * rbuf) {
return rbuf->buffer + rbuf->write_index;
}
static void rbuf_increment_write(ring_buffer_t * rbuf, size_t size) {
if(!(size % rbuf->block_len))
log_exit("size needs to be a multiple of buffer block length");
log_error("RBUF", "size needs to be a multiple of buffer block length");
rbuf->write_index = (rbuf->write_index + size) % rbuf->buf_len;
}
static void rbuf_increment_read(ring_buffer_t * rbuf, size_t size) {
if(!(size % rbuf->block_len))
log_exit("size needs to be a multiple of buffer block length");
log_error("RBUF", "size needs to be a multiple of buffer block length");
rbuf->read_index = (rbuf->read_index + size) % rbuf->buf_len;
}
static volatile char * rbuf_read(ring_buffer_t * rbuf, size_t * size) {
if(!(size % rbuf->block_len))
log_exit("size needs to be a multiple of buffer block length");
static uint8_t * rbuf_read(ring_buffer_t * rbuf) {
return rbuf->buffer + rbuf->read_index;
}
static int rbuf_contiguous_copy(ring_buffer_t * rbuf1, ring_buffer_t * rbuf2, int n) {
......@@ -98,7 +96,7 @@ static int rbuf_contiguous_copy(ring_buffer_t * rbuf1, ring_buffer_t * rbuf2, in
static ring_buffer_t test_rbuf;
int main(int argc, char ** argv) {
char volatile * data;
uint8_t * data;
size_t size;
int i, j;
......
......@@ -162,21 +162,21 @@ static ring_buffer_t tx_rbuf; // Packets to send, ethernet frames
static ring_buffer_t trxw_rbuf[MAX_CHANNELS]; // Uncompressed IQ samples
// Counters
static volatile counter_stat_t recv_counter; // frames received from RRH
static volatile counter_stat_t decode_counter; // decoded frames
static volatile counter_stat_t read_counter; // frames passed to amarisoft stack
static volatile counter_stat_t write_counter; // samples to write from TRX
static volatile counter_stat_t encode_counter; // encoded frames
static volatile counter_stat_t sent_counter; // frames sent to RRH
static volatile counter_stat_t rx_drop_counter; // frames sent to RRH
static volatile counter_stat_t tx_drop_counter; // frames sent to RRH
static counter_stat_t recv_counter; // frames received from RRH
static counter_stat_t decode_counter; // decoded frames
static counter_stat_t read_counter; // frames passed to amarisoft stack
static counter_stat_t write_counter; // samples to write from TRX
static counter_stat_t encode_counter; // encoded frames
static counter_stat_t sent_counter; // frames sent to RRH
static counter_stat_t rx_drop_counter; // frames sent to RRH
static counter_stat_t tx_drop_counter; // frames sent to RRH
// Network
static volatile uint8_t seq_id;
static volatile uint8_t frame_id;
static volatile uint8_t sub_frame_id;
static volatile uint8_t slot_id;
static volatile uint8_t symbol_id;
static uint8_t seq_id;
static uint8_t frame_id;
static uint8_t sub_frame_id;
static uint8_t slot_id;
static uint8_t symbol_id;
static int send_sockfd;
static int recv_sockfd;
static struct sockaddr_ll connect_sk_addr;
......@@ -283,13 +283,13 @@ static void log_exit(const char * section, const char * msg, ...) {
exit(EXIT_FAILURE);
}
static void init_counter(volatile counter_stat_t * c) {
static void init_counter(counter_stat_t * c) {
c->counter = 0;
c->pps_counter = 0;
c->pps_ts = 0;
c->pps = 0;
}
static void update_counter_pps(volatile counter_stat_t * c) {
static void update_counter_pps(counter_stat_t * c) {
struct timespec _ts;
int64_t ts;
clock_gettime(CLOCK_TAI, &_ts);
......@@ -301,11 +301,11 @@ static void update_counter_pps(volatile counter_stat_t * c) {
c->pps_ts = ts;
}
}
static void update_counter(volatile counter_stat_t * c, int64_t v) {
static void update_counter(counter_stat_t * c, int64_t v) {
c->counter += v;
}
#define RX_BURST_SIZE
#define RX_BURST_SIZE 4000
static void *recv_thread(void *p) {
cpu_set_t mask;
......@@ -329,11 +329,11 @@ static void *recv_thread(void *p) {
memset(msgh, 0, sizeof(msgh));
for(int j = 0; j < RX_BURST_SIZE; j++) {
msgv[j].iov_base = rbuf_write(&rx_rbuf, rx_rbuf.block_len);
msgv[j].iov_base = rbuf_write(&rx_rbuf);
msgv[j].iov_len = rx_rbuf.block_len;
msgh[j].msg_hdr.msg_iov = &msgv[j];
msgh[j].msg_hdr.msg_iovlen = 1;
rbuf_update_write_index(&rx_rbuf, rx_rbuf.block_len);
rbuf_increment_write(&rx_rbuf, rx_rbuf.block_len);
}
ret = recvmmsg(recv_sockfd, msgh, RX_BURST_SIZE, 0, NULL);
......@@ -347,7 +347,7 @@ static void *recv_thread(void *p) {
pthread_exit(EXIT_SUCCESS);
}
#define TX_BURST_SIZE
#define TX_BURST_SIZE 4000
// Send as soon as packets are encoded
static void *send_thread(void *p) {
......@@ -380,8 +380,8 @@ static void *send_thread(void *p) {
int burst_size;
int to_send = rbuf_read_amount(&tx_rbuf);
for(burst_size ; to_send ; burst_size++) {
msgv[burst_size].iov_base = rbuf_read(&tx_rbuf, tx_rbuf.block_len);
for(burst_size = 0 ; to_send ; burst_size++) {
msgv[burst_size].iov_base = rbuf_read(&tx_rbuf);
msgv[burst_size].iov_len = tx_rbuf.block_len;
rbuf_increment_read(&tx_rbuf, tx_rbuf.block_len);
if(burst_size > TX_BURST_SIZE)
......@@ -390,11 +390,10 @@ static void *send_thread(void *p) {
}
for(int msg_sent = 0; msg_sent < burst_size;) {
ret = sendmmsg(send_sockfd, msgh + msg_sent, (burst_size - msg_sent), 0);
int ret = sendmmsg(send_sockfd, msgh + msg_sent, (burst_size - msg_sent), 0);
if(ret <= 0)
error(EXIT_FAILURE, errno, "sendmmsg error (returned %d)", ret);
msg_sent += ret;
sent_frame_count += ret;
update_counter(&sent_counter, ret);
}
}
......@@ -405,9 +404,6 @@ static void *encode_thread(void *p) {
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
int64_t target_counter = 0;
struct timespec next;
int reset_encode_counter = 1;
uint8_t * data;
// Set thread CPU affinity
......@@ -418,7 +414,7 @@ static void *encode_thread(void *p) {
for(int64_t i = 0;; i++) {
int64_t to_write, to_read;
uint16_t * 16_bit;
uint16_t * word;
int j;
// If we have frames to encode (is there space in TX buffer)
......@@ -426,23 +422,23 @@ static void *encode_thread(void *p) {
to_write = rbuf_write_amount(&tx_rbuf);
to_read = rbuf_read_amount(&trxw_rbuf[0]);
if(!to_write || !to_read)
continue
continue;
for(uint16_t antenna_id = 0 ; antenna_id < s->tx_n_channel; antenna_id++) {
data = rbuf_write(&tx_rbuf);
j = ETHERNET_HEADER + ECPRI_COMMON_HEADER;
// PC_ID
*((uint16_t *) data[j + 0]) = htons(antenna_id);
*((uint16_t *) (data + j + 0)) = htons(antenna_id);
// SEQ_ID
*((uint16_t *) data[j + 2]) = htons(seq_id++ << 8 + 0x80);
*((uint16_t *) (data + j + 2)) = htons( (((uint16_t) seq_id++) << 8) + 0x80);
j = ETHERNET_HEADER + ECPRI_IQ_HEADER;
// ORAN counters
data[j + 1] = frame_id;
16_bit = (uint16_t *) data[j + 2];
*16_bit = sub_frame_id << 8;
*16_bit |= slot_id << 4 ;
*16_bit |= symbol_id;
word = (uint16_t *) (data + j + 2);
*word = sub_frame_id << 8;
*word |= slot_id << 4 ;
*word |= symbol_id;
// 8832 bytes of IQ samples
......@@ -451,14 +447,14 @@ static void *encode_thread(void *p) {
int count_left = IQ_PAYLOAD;
while((write_count = rbuf_contiguous_copy(&trxw_rbuf[antenna_id], NULL,
count_left))) {
memcpy(data[PACKET_HEADER] + (IQ_PAYLOAD - count_left),
rbuf_read(&trxw_rbuf[antenna_id], IQ_PAYLOAD - count_left),
memcpy(data + PACKET_HEADER + IQ_PAYLOAD - count_left,
rbuf_read(&trxw_rbuf[antenna_id]),
write_count);
rbuf_increment_read(&trxw_rbuf[antenna_id], write_count);
rbuf_increment_write(&tx_rbuf, write_count);
rbuf_increment_read(trxw_rbuf[0], write_count);
rbuf_increment_read(&trxw_rbuf[0], write_count);
count_left -= write_count;
}
if(count_left)
......@@ -482,10 +478,8 @@ static void *encode_thread(void *p) {
static void *decode_thread(void *p) {
cpu_set_t mask;
uint8_t * data;
TRXEcpriState * s = (TRXEcpriState *) p;
struct timespec next;
int64_t target_counter = 0;
int reset_decode_counter = 1;
log_info("DECODE_THREAD", "Thread init");
// Set thread CPU affinity
......@@ -500,19 +494,21 @@ static void *decode_thread(void *p) {
to_read = rbuf_read_amount(&rx_rbuf);
to_write = rbuf_write_amount(&trxr_rbuf[0]);
if(!to_write || !to_read)
continue
continue;
data = rbuf_read(&rx_rbuf);
data = rbuf_read(&rx_rbuf, rx_rbuf.block_len);
antenna_id = htons((uint16_t *) data[j + 0]);
seq_id = htons((uint16_t *) data[j + 2]);
int j = ETHERNET_HEADER + ECPRI_COMMON_HEADER;
uint16_t antenna_id = ntohs(*((uint16_t *) (data + j + 0)));
uint8_t seq_id = ((uint8_t) (*((uint16_t * ) (data + j + 2)) >> 8));
int write_count;
int count_left = IQ_PAYLOAD;
while((write_count = rbuf_contiguous_copy(NULL,
&trxr_rbuf[antenna_id], IQ_PAYLOAD))) {
memcpy(trxr_rbuf[antenna_id] + (IQ_PAYLOAD - count_left),
rbuf_read(&rx_rbuf, IQ_PAYLOAD - count_left),
memcpy(trxr_rbuf[antenna_id].buffer + (IQ_PAYLOAD - count_left),
data + PACKET_HEADER + IQ_PAYLOAD - count_left,
write_count);
rbuf_increment_write(&trxr_rbuf[antenna_id], write_count);
rbuf_increment_read(&rx_rbuf, write_count);
......
......@@ -15,7 +15,7 @@ static void log_error(const char * section, const char * msg, ...) {
exit(EXIT_FAILURE);
}
static volatile int64_t limit_counter = 0;
static int64_t limit_counter = 0;
static inline void log_limit(const char * section, const char * msg, ...) {
time_t t;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment