Commit f9bc3698 authored by Joanne Hugé's avatar Joanne Hugé

Support variable IQ payload

parent b27b6ce2
...@@ -55,8 +55,11 @@ ...@@ -55,8 +55,11 @@
#define ECPRI_IQ_HEADER (ECPRI_COMMON_HEADER + 4) #define ECPRI_IQ_HEADER (ECPRI_COMMON_HEADER + 4)
#define ORAN_HEADER 8 #define ORAN_HEADER 8
#define PACKET_HEADER (ETHERNET_HEADER + ECPRI_IQ_HEADER + ORAN_HEADER) #define PACKET_HEADER (ETHERNET_HEADER + ECPRI_IQ_HEADER + ORAN_HEADER)
#define IQ_PAYLOAD 8832 // MAX_IQ_PAYLOAD + PAYLOAD_SIZE
#define PACKET_SIZE (PACKET_HEADER + IQ_PAYLOAD) #define MAX_IQ_PAYLOAD (8832 + 2)
#define TX_IQ_PAYLOAD 4414
#define MAX_PACKET_SIZE (PACKET_HEADER + MAX_IQ_PAYLOAD)
#define TX_PACKET_SIZE (PACKET_HEADER + TX_IQ_PAYLOAD)
#define MAX_RX_BURST 1024 #define MAX_RX_BURST 1024
#define MAX_TX_BURST 1024 #define MAX_TX_BURST 1024
...@@ -172,7 +175,7 @@ typedef struct { ...@@ -172,7 +175,7 @@ typedef struct {
uint8_t seq_id; uint8_t seq_id;
uint8_t seq_id_fixed; uint8_t seq_id_fixed;
uint8_t oran_header[ORAN_HEADER]; uint8_t oran_header[ORAN_HEADER];
uint8_t iq_samples[IQ_PAYLOAD]; uint8_t iq_samples[MAX_IQ_PAYLOAD];
} ecpri_iq_packet; } ecpri_iq_packet;
typedef struct { typedef struct {
...@@ -192,8 +195,8 @@ typedef struct { ...@@ -192,8 +195,8 @@ typedef struct {
// Buffers // Buffers
static ring_buffer_t trxr_rbuf[MAX_CHANNELS]; // Decoded IQ samples static ring_buffer_t trxr_rbuf[MAX_CHANNELS]; // Decoded IQ samples
static ring_buffer_t trxw_rbuf[MAX_CHANNELS]; // Uncompressed IQ samples static ring_buffer_t trxw_rbuf[MAX_CHANNELS]; // Uncompressed IQ samples
static uint8_t rx_buf[PACKET_SIZE * MAX_RX_BURST]; static uint8_t rx_buf[MAX_PACKET_SIZE * MAX_RX_BURST];
static uint8_t tx_buf[PACKET_SIZE * MAX_TX_BURST]; static uint8_t tx_buf[MAX_PACKET_SIZE * MAX_TX_BURST];
// Counters // Counters
static counter_stat_t recv_counter; // frames received from RRH static counter_stat_t recv_counter; // frames received from RRH
...@@ -311,15 +314,15 @@ static void print_stats(FILE * f, int print_header) { ...@@ -311,15 +314,15 @@ static void print_stats(FILE * f, int print_header) {
"%" STAT_INT_LEN "" PRIi64 "pps " "%" STAT_INT_LEN "" PRIi64 "pps "
"%" STAT_INT_LEN "" PRIi64 "pps " "%" STAT_INT_LEN "" PRIi64 "pps "
"\n", "\n",
(N_SAMPLES * sizeof(float) * 2 * rx_drop_counter.counter) / IQ_PAYLOAD, rx_drop_counter.counter,
(N_SAMPLES * sizeof(float) * 2 * tx_drop_counter.counter) / IQ_PAYLOAD, tx_drop_counter.counter,
recv_counter.counter, recv_counter.counter,
(N_SAMPLES * sizeof(float) * 2 * read_counter.counter) / IQ_PAYLOAD, read_counter.counter,
(N_SAMPLES * sizeof(float) * 2 * write_counter.counter) / IQ_PAYLOAD, write_counter.counter,
sent_counter.counter, sent_counter.counter,
recv_counter.pps, recv_counter.pps,
(N_SAMPLES * sizeof(float) * 2 * read_counter.pps) / IQ_PAYLOAD, read_counter.pps,
(N_SAMPLES * sizeof(float) * 2 * write_counter.pps) / IQ_PAYLOAD, write_counter.pps,
sent_counter.pps); sent_counter.pps);
} }
...@@ -423,8 +426,8 @@ static void *recv_thread(void *p) { ...@@ -423,8 +426,8 @@ static void *recv_thread(void *p) {
memset(msgv, 0, sizeof(msgv)); memset(msgv, 0, sizeof(msgv));
memset(msgh, 0, sizeof(msgh)); memset(msgh, 0, sizeof(msgh));
for(int j = 0; j < s->rx_burst; j++) { for(int j = 0; j < s->rx_burst; j++) {
msgv[j].iov_base = rx_buf + j * PACKET_SIZE; msgv[j].iov_base = rx_buf + j * MAX_PACKET_SIZE;
msgv[j].iov_len = PACKET_SIZE; msgv[j].iov_len = MAX_PACKET_SIZE;
msgh[j].msg_hdr.msg_iov = &msgv[j]; msgh[j].msg_hdr.msg_iov = &msgv[j];
msgh[j].msg_hdr.msg_iovlen = 1; msgh[j].msg_hdr.msg_iovlen = 1;
} }
...@@ -435,21 +438,21 @@ static void *recv_thread(void *p) { ...@@ -435,21 +438,21 @@ static void *recv_thread(void *p) {
error(EXIT_FAILURE, errno, "recvmmsg error"); error(EXIT_FAILURE, errno, "recvmmsg error");
// Process each received message // Process each received message
for(int j = 0; j < ret; j++) { for(int j = 0; j < ret; j++) {
header = (ecpri_header*) (rx_buf + j * PACKET_SIZE); header = (ecpri_header*) (rx_buf + j * MAX_PACKET_SIZE);
// Discard packet if it is not eCPRI // Discard packet if it is not eCPRI
if(header->ether_type != 0xfeae) if(header->ether_type != 0xfeae)
continue; continue;
// Stop if packet has unexpected size // Stop if packet has unexpected size
if((msgh + j)->msg_len != PACKET_SIZE) { if((msgh + j)->msg_len < (PACKET_HEADER + 32)) {
stop = 1; stop = 1;
log_info("DECODE_THREAD", "Packet doesn't have correct size (%d != %d)", (msgh + j)->msg_len, PACKET_SIZE); log_info("RECV_THREAD", "Packet doesn't have correct size (%d < %d)", (msgh + j)->msg_len, PACKET_HEADER + 32);
break; break;
} }
// If packet is a timing packet // If packet is a timing packet
if(header->ecpri_type == 255) { if(header->ecpri_type == 255) {
timing_packet = (ecpri_timing_packet*) (rx_buf + j * PACKET_SIZE); timing_packet = (ecpri_timing_packet*) (rx_buf + j * MAX_PACKET_SIZE);
log_info("DECODE_THREAD", "GPS TIME: %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", log_info("RECV_THREAD", "GPS TIME: %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
timing_packet->gps_time[0], timing_packet->gps_time[0],
timing_packet->gps_time[1], timing_packet->gps_time[1],
timing_packet->gps_time[2], timing_packet->gps_time[2],
...@@ -464,40 +467,41 @@ static void *recv_thread(void *p) { ...@@ -464,40 +467,41 @@ static void *recv_thread(void *p) {
} }
// Exit if packet is neither timing nor IQ // Exit if packet is neither timing nor IQ
else if(header->ecpri_type != 0) { else if(header->ecpri_type != 0) {
log_info("DECODE_THREAD", "Unknown eCPRI type: %d\n", header->ecpri_type); log_info("RECV_THREAD", "Unknown eCPRI type: %d\n", header->ecpri_type);
stop = 1; break; stop = 1; break;
} }
// Exit if SEQ ID is not sequential // Exit if SEQ ID is not sequential
if ( prev_seq_id != -1 && (header->seq_id + 256 - prev_seq_id) % 256 != 1 ) { if ( prev_seq_id != -1 && (header->seq_id + 256 - prev_seq_id) % 256 != 1 ) {
log_info("DECODE_THREAD", "seq_ids are not sequential (%d, %d)", log_info("RECV_THREAD", "seq_ids are not sequential (%d, %d)",
prev_seq_id, header->seq_id); prev_seq_id, header->seq_id);
stop = 1; break; stop = 1; break;
} }
// Exit if antenna ID is not in range // Exit if antenna ID is not in range
antenna_id = ntohs(header->antenna_id) - 1; antenna_id = ntohs(header->antenna_id) - 1;
if(antenna_id > s->rx_n_channel || antenna_id < 0) { if(antenna_id > s->rx_n_channel || antenna_id < 0) {
log_info("DECODE_THREAD", "Wrong Antenna ID: %d\n", antenna_id); log_info("RECV_THREAD", "Wrong Antenna ID: %d\n", antenna_id);
stop = 1; break; stop = 1; break;
} }
// Exit if there is no more space in the buffer // Exit if there is no more space in the buffer
if(!rbuf_write_amount(&trxr_rbuf[antenna_id])) if(!rbuf_write_amount(&trxr_rbuf[antenna_id]))
log_exit("DECODE_THREAD", "No more space in %s buffer", log_exit("RECV_THREAD", "No more space in %s buffer",
trxr_rbuf[antenna_id].name); trxr_rbuf[antenna_id].name);
prev_seq_id = header->seq_id; prev_seq_id = header->seq_id;
iq_packet = (ecpri_iq_packet*) (rx_buf + j * PACKET_SIZE); iq_packet = (ecpri_iq_packet*) (rx_buf + j * MAX_PACKET_SIZE);
memcpy(trxr_rbuf[antenna_id].buffer, *((uint16_t *) (trxr_rbuf[antenna_id].buffer)) = iq_packet.payload_size;
memcpy(trxr_rbuf[antenna_id].buffer + 2,
iq_packet->iq_samples, iq_packet->iq_samples,
IQ_PAYLOAD); iq_packet.payload_size);
rbuf_increment_write(&trxr_rbuf[antenna_id], IQ_PAYLOAD); rbuf_increment_write(&trxr_rbuf[antenna_id], trxr_rbuf.block_len);
update_counter(&recv_counter, 1); update_counter(&recv_counter, iq_packet.payload_size / N_SAMPLES);
} }
if(stop) { if(stop) {
for(int j = 0; j < ret; j++) { for(int j = 0; j < ret; j++) {
header = (ecpri_header*) (rx_buf + j * PACKET_SIZE); header = (ecpri_header*) (rx_buf + j * MAX_PACKET_SIZE);
print_packet(header); print_packet(header);
} }
log_exit("DECODE_THREAD", "Exiting"); log_exit("RECV_THREAD", "Exiting");
} }
} }
pthread_exit(EXIT_SUCCESS); pthread_exit(EXIT_SUCCESS);
...@@ -531,8 +535,8 @@ static void *send_thread(void *p) { ...@@ -531,8 +535,8 @@ static void *send_thread(void *p) {
msgh[j].msg_hdr.msg_namelen = sizeof(connect_sk_addr); msgh[j].msg_hdr.msg_namelen = sizeof(connect_sk_addr);
msgh[j].msg_hdr.msg_iov = &msgv[j]; msgh[j].msg_hdr.msg_iov = &msgv[j];
msgh[j].msg_hdr.msg_iovlen = 1; msgh[j].msg_hdr.msg_iovlen = 1;
msgv[j].iov_base = tx_buf + j * PACKET_SIZE; msgv[j].iov_base = tx_buf + j * TX_PACKET_SIZE;
msgv[j].iov_len = PACKET_SIZE; msgv[j].iov_len = TX_PACKET_SIZE;
} }
clock_gettime(CLOCK_TAI, &initial); clock_gettime(CLOCK_TAI, &initial);
log_info("SEND_THREAD", "Starting loop"); log_info("SEND_THREAD", "Starting loop");
...@@ -541,7 +545,7 @@ static void *send_thread(void *p) { ...@@ -541,7 +545,7 @@ static void *send_thread(void *p) {
// Send at most tx_burst packets // Send at most tx_burst packets
int to_send = s->tx_burst / s->tx_n_channel; int to_send = s->tx_burst / s->tx_n_channel;
for(int k = 0; k < s->tx_n_channel; k++) { for(int k = 0; k < s->tx_n_channel; k++) {
int to_read = rbuf_read_amount(&trxw_rbuf[k]) / IQ_PAYLOAD; int to_read = rbuf_read_amount(&trxw_rbuf[k]) / TX_IQ_PAYLOAD;
if(to_read < to_send) if(to_read < to_send)
to_send = to_read; to_send = to_read;
} }
...@@ -549,7 +553,7 @@ static void *send_thread(void *p) { ...@@ -549,7 +553,7 @@ static void *send_thread(void *p) {
for(int encoded = 0; encoded < to_send;) { for(int encoded = 0; encoded < to_send;) {
for(uint16_t antenna_id = 0 ; antenna_id < s->tx_n_channel; antenna_id++) { for(uint16_t antenna_id = 0 ; antenna_id < s->tx_n_channel; antenna_id++) {
iq_packet = (ecpri_iq_packet*) (tx_buf + encoded * PACKET_SIZE); iq_packet = (ecpri_iq_packet*) (tx_buf + encoded * TX_PACKET_SIZE);
// PC_ID // PC_ID
iq_packet->antenna_id = htons(antenna_id) + 1; iq_packet->antenna_id = htons(antenna_id) + 1;
...@@ -564,11 +568,11 @@ static void *send_thread(void *p) { ...@@ -564,11 +568,11 @@ static void *send_thread(void *p) {
*word |= slot_id << 4 ; *word |= slot_id << 4 ;
*word |= symbol_id; *word |= symbol_id;
// Add IQ_PAYLOAD to the current packet // Add TX_IQ_PAYLOAD to the current packet
memcpy(iq_packet->iq_samples, memcpy(iq_packet->iq_samples,
rbuf_read(&trxw_rbuf[antenna_id]), rbuf_read(&trxw_rbuf[antenna_id]),
IQ_PAYLOAD); TX_IQ_PAYLOAD);
rbuf_increment_read(&trxw_rbuf[antenna_id], IQ_PAYLOAD); rbuf_increment_read(&trxw_rbuf[antenna_id], TX_IQ_PAYLOAD);
check_rbuf_read(&trxw_rbuf[antenna_id], log_exit); check_rbuf_read(&trxw_rbuf[antenna_id], log_exit);
encoded++; encoded++;
} }
...@@ -588,7 +592,7 @@ static void *send_thread(void *p) { ...@@ -588,7 +592,7 @@ static void *send_thread(void *p) {
if(ret <= 0) if(ret <= 0)
error(EXIT_FAILURE, errno, "sendmmsg error (returned %d)", ret); error(EXIT_FAILURE, errno, "sendmmsg error (returned %d)", ret);
j += ret; j += ret;
update_counter(&sent_counter, ret); update_counter(&sent_counter, ret * TX_IQ_PAYLOAD / N_SAMPLES);
} }
} }
pthread_exit(EXIT_SUCCESS); pthread_exit(EXIT_SUCCESS);
...@@ -737,12 +741,12 @@ int start(TRXEcpriState * s) { ...@@ -737,12 +741,12 @@ int start(TRXEcpriState * s) {
for(int i = 0; i < s->tx_n_channel; i++) { for(int i = 0; i < s->tx_n_channel; i++) {
char name[256]; char name[256];
sprintf(name, "TRXWrite Ring Buffer %d", i); sprintf(name, "TRXWrite Ring Buffer %d", i);
init_rbuf(&trxw_rbuf[i], name, s->trx_buf_size, IQ_PAYLOAD); init_rbuf(&trxw_rbuf[i], name, s->trx_buf_size, TX_IQ_PAYLOAD);
} }
for(int i = 0; i < s->rx_n_channel; i++) { for(int i = 0; i < s->rx_n_channel; i++) {
char name[256]; char name[256];
sprintf(name, "TRXRead Ring Buffer %d", i); sprintf(name, "TRXRead Ring Buffer %d", i);
init_rbuf(&trxr_rbuf[i], name, s->trx_buf_size, IQ_PAYLOAD); init_rbuf(&trxr_rbuf[i], name, s->trx_buf_size, MAX_IQ_PAYLOAD);
} }
if (!(if_index = if_nametoindex(s->bbu_if))) { if (!(if_index = if_nametoindex(s->bbu_if))) {
...@@ -822,7 +826,7 @@ int start(TRXEcpriState * s) { ...@@ -822,7 +826,7 @@ int start(TRXEcpriState * s) {
/* Common Header */ /* Common Header */
iq_packet.ecpri_version = 0x10; // Version 0x1, Reserved = 0, C = 0 iq_packet.ecpri_version = 0x10; // Version 0x1, Reserved = 0, C = 0
iq_packet.ecpri_type = 0x00; // Message type (IQ data) iq_packet.ecpri_type = 0x00; // Message type (IQ data)
iq_packet.payload_size = (uint16_t) IQ_PAYLOAD; iq_packet.payload_size = (uint16_t) TX_IQ_PAYLOAD;
iq_packet.seq_id_fixed = 0x80; iq_packet.seq_id_fixed = 0x80;
/* ORAN HEADER */ /* ORAN HEADER */
...@@ -833,7 +837,7 @@ int start(TRXEcpriState * s) { ...@@ -833,7 +837,7 @@ int start(TRXEcpriState * s) {
iq_packet.oran_header[6] = 0x00; iq_packet.oran_header[6] = 0x00;
for(int i = 0; i < MAX_TX_BURST; i++) for(int i = 0; i < MAX_TX_BURST; i++)
memcpy(tx_buf + i * PACKET_SIZE, (uint8_t*) &iq_packet, PACKET_SIZE); memcpy(tx_buf + i * TX_PACKET_SIZE, (uint8_t*) &iq_packet, PACKET_SIZE);
start_threads(s); start_threads(s);
return 0; return 0;
...@@ -914,34 +918,34 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void ...@@ -914,34 +918,34 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__samples, static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__samples,
int count, int rx_port_index, TRXReadMetadata *md) int count, int rx_port_index, TRXReadMetadata *md)
{ {
int64_t nc, count_left, offset; int64_t nc, count_left, offset, ra;
uint8_t * data;
uint16_t payload_size;
float ** _samples = (float **) __samples; float ** _samples = (float **) __samples;
TRXEcpriState *s = s1->opaque; TRXEcpriState *s = s1->opaque;
while(1) {
int ready = 1;
for(int k = 0; k < s->rx_n_channel; k++) {
if(rbuf_read_amount(&trxr_rbuf[k]) < count * sizeof(Complex)) {
ready = 0;
break;
}
}
if(ready)
break;
usleep(100);
}
for(int i = 0; i < s->rx_n_channel; i++ ) { for(int i = 0; i < s->rx_n_channel; i++ ) {
offset = 0; offset = 0;
count_left = count; count_left = count;
while((nc = rbuf_contiguous_copy(&trxr_rbuf[i], NULL, count_left * sizeof(Complex)))) { while(count_left) {
memcpy( ra = rbuf_read_amount(&trxr_rbuf[i])
((uint8_t*) _samples[i]) + offset, if(!ra)
((uint8_t *) trxr_rbuf[i].buffer) + trxr_rbuf[i].read_index, usleep(100);
nc); for(int j = 0; j < ra; j++) {
rbuf_increment_read(&trxr_rbuf[i], nc); data = trxr_rbuf[i].buffer + trxr_rbuf[i].read_index;
count_left -= nc / sizeof(Complex); payload_size = *((uint16_t *) data);
offset += nc; if(count_left < payload_size / sizeof(Complex))
payload_size = count_left * sizeof(Complex);
memcpy(
((uint8_t*) _samples[i]) + offset,
data + 2,
payload_size);
count_left -= payload_size / sizeof(Complex);
if(!count_left)
break;
offset += payload_size;
rbuf_increment_read(&trxr_rbuf[i], trxr_rbuf[i].block_len);
}
} }
} }
*ptimestamp = (read_counter.counter) * N_SAMPLES; *ptimestamp = (read_counter.counter) * N_SAMPLES;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment