Commit 686fa61e authored by Joanne Hugé's avatar Joanne Hugé

Add master mode

parent aa1b8a8c
...@@ -49,19 +49,15 @@ ...@@ -49,19 +49,15 @@
#include "utils.c" #include "utils.c"
//#define DEBUG // Enables / deactivates log_debug //#define DEBUG // Enables / deactivates log_debug
//#define STAT_SHOW_COUNTERS
#define PPS_UPDATE_PERIOD INT64_C(1000000000) #define PPS_UPDATE_PERIOD INT64_C(1500000000)
//#define DST_ADDR_SYNTAX // Depends on DPDK version
#define TX_PACKET_SIZE 262 #define TX_PACKET_SIZE 262
#define RX_MAX_PACKET_SIZE 262 #define RX_MAX_PACKET_SIZE 262
#define TX_ECPRI_PACKET_SIZE (TX_PACKET_SIZE - 14) #define TX_ECPRI_PACKET_SIZE (TX_PACKET_SIZE - 14)
#define N_SAMPLES (32) #define N_SAMPLES (32)
#define TRX_MAX_GROUP 1500 #define TRX_MAX_GROUP 1500
#define STAT_INT_LEN "8"
#define STAT_INT_LEN "9" //#define DST_ADDR_SYNTAX // Depends on DPDK version
typedef struct { typedef struct {
float re; float re;
...@@ -69,32 +65,43 @@ typedef struct { ...@@ -69,32 +65,43 @@ typedef struct {
} Complex; } Complex;
typedef struct { typedef struct {
// Log
const uint8_t * log_directory;
// Network
const uint8_t * re_mac; const uint8_t * re_mac;
const uint8_t * rec_mac; const uint8_t * rec_mac;
const uint8_t * rec_if; const uint8_t * rec_if;
const char * dpdk_options; const char * dpdk_options;
const uint8_t * log_directory; // RF
int rx_n_channel;
int tx_n_channel;
int frame_frequency;
int tdd_period;
int sample_rate;
// Perfomance / RT
int recv_affinity; int recv_affinity;
int send_affinity; int send_affinity;
int encode_affinity; int encode_affinity;
int decode_affinity; int decode_affinity;
int statistic_affinity; int statistic_affinity;
int ecpri_period; int ecpri_period;
int flow_id;
int frame_frequency;
int trx_buf_size; int trx_buf_size;
int txrx_buf_size; int txrx_buf_size;
int encode_burst;
int send_burst;
int statistics_refresh_rate_ns;
// Trace / Monitor
int trace_rx; int trace_rx;
int trace_tx; int trace_tx;
int trace_offset; int trace_offset;
int monitor_pps; int monitor_pps;
int monitor_trigger_duration; int monitor_trigger_duration;
int start_sending; // eCPRI
int flow_id;
int start_receiving; int start_receiving;
int rx_n_channel; int master;
int tx_n_channel; int generic_data_sync;
int statistics_refresh_rate_ns; int trx_read_null;
int sample_rate;
} TRXEcpriState; } TRXEcpriState;
typedef struct { typedef struct {
...@@ -140,9 +147,11 @@ static volatile counter_stat_t decode_counter; // decoded frames ...@@ -140,9 +147,11 @@ static volatile counter_stat_t decode_counter; // decoded frames
static volatile counter_stat_t read_counter; // frames passed to amarisoft stack static volatile counter_stat_t read_counter; // frames passed to amarisoft stack
static volatile counter_stat_t write_counter; // samples to write from TRX static volatile counter_stat_t write_counter; // samples to write from TRX
static volatile counter_stat_t encode_counter; // encoded frames static volatile counter_stat_t encode_counter; // encoded frames
static volatile counter_stat_t trx_encode_counter; // encoded frames
static volatile counter_stat_t sent_counter; // frames sent to eRE static volatile counter_stat_t sent_counter; // frames sent to eRE
static volatile counter_stat_t rx_drop_counter; // frames sent to eRE static volatile counter_stat_t rx_drop_counter; // frames sent to eRE
static volatile counter_stat_t tx_drop_counter; // frames sent to eRE static volatile counter_stat_t tx_drop_counter; // frames sent to eRE
static volatile counter_stat_t empty_encode_counter; // frames sent to eRE
static volatile int sync_complete = 0; static volatile int sync_complete = 0;
static volatile int received_pkts = 0; static volatile int received_pkts = 0;
...@@ -160,8 +169,12 @@ static int64_t decode_counter_prev = 0; ...@@ -160,8 +169,12 @@ static int64_t decode_counter_prev = 0;
static int recv_pps_threshold; static int recv_pps_threshold;
//DEBUG
static volatile int min_count_trx = 1000000;
// Network // Network
static volatile int seq_id; static volatile int tx_seq_id;
static volatile int rx_seq_id;
static void rbuf_update_write_index(ring_buffer_t * rbuf) { static void rbuf_update_write_index(ring_buffer_t * rbuf) {
rbuf->write_index = (rbuf->write_index + 1) % rbuf->buf_len; rbuf->write_index = (rbuf->write_index + 1) % rbuf->buf_len;
...@@ -206,66 +219,150 @@ static int rbuf_contiguous_copy(ring_buffer_t * rbuf1, ring_buffer_t * rbuf2, in ...@@ -206,66 +219,150 @@ static int rbuf_contiguous_copy(ring_buffer_t * rbuf1, ring_buffer_t * rbuf2, in
static void print_stats(FILE * f, int print_header) { static void print_stats(FILE * f, int print_header) {
if(print_header) { if(print_header) {
fprintf(f, fprintf(f,
"%" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s " #ifdef STAT_SHOW_COUNTERS
"%" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s " #endif
"%" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"\n", "\n",
"rx dropped", "RX",
"tx dropped", "TX",
"received", #ifdef STAT_SHOW_COUNTERS
"decode", "RECEIVED",
"read", "DECODE",
"write", "READ",
"encode", "WRITE",
"sent", "ENCODE",
"received pps", "SENT",
"decode pps", "EMPTY TX",
"read pps", #endif
"write pps", "RECEIVED",
"encode pps", "DECODE",
"sent pps"); "READ",
"WRITE",
"ENCODE",
"SENT",
"EMPTY TX",
"RX",
"TRXR",
"TRXW",
"TX");
fprintf(f,
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
#ifdef STAT_SHOW_COUNTERS
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
#endif
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"\n",
"DROPPED",
"DROPPED",
#ifdef STAT_SHOW_COUNTERS
"COUNTER",
"COUNTER",
"COUNTER",
"COUNTER",
"COUNTER",
"COUNTER",
"COUNTER",
#endif
"PPS",
"PPS",
"PPS",
"PPS",
"PPS",
"PPS",
"PPS",
"DELAY",
"DELAY",
"DELAY",
"DELAY");
} }
fprintf(f, fprintf(f,
"%" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " " #ifdef STAT_SHOW_COUNTERS
"%" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 "pps " "%-" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 "pps " "%-" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 "pps " #endif
"%" STAT_INT_LEN "" PRIi64 "pps " "%-" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 "pps " "%-" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 "pps " "%-" STAT_INT_LEN "" PRIi64 " "
"%-" STAT_INT_LEN "" PRIi64 " "
"%-" STAT_INT_LEN "" PRIi64 " "
"%-" STAT_INT_LEN "" PRIi64 " "
"%-" STAT_INT_LEN "" PRIi64 " "
"%-" STAT_INT_LEN "d "
"%-" STAT_INT_LEN "d "
"%-" STAT_INT_LEN "d "
"%-" STAT_INT_LEN "d "
"%-" STAT_INT_LEN "d "
"%-" STAT_INT_LEN "d "
"%-" STAT_INT_LEN "d "
"\n", "\n",
rx_drop_counter.counter, rx_drop_counter.counter,
tx_drop_counter.counter, tx_drop_counter.counter,
#ifdef STAT_SHOW_COUNTERS
recv_counter.counter, recv_counter.counter,
decode_counter.counter, decode_counter.counter,
read_counter.counter, read_counter.counter,
write_counter.counter, write_counter.counter,
encode_counter.counter, encode_counter.counter,
sent_counter.counter, sent_counter.counter,
empty_encode_counter.counter,
#endif
recv_counter.pps, recv_counter.pps,
decode_counter.pps, decode_counter.pps,
read_counter.pps, read_counter.pps,
write_counter.pps, write_counter.pps,
encode_counter.pps, encode_counter.pps,
sent_counter.pps); sent_counter.pps,
empty_encode_counter.pps,
rbuf_read_amount(&rx_rbuf),
rbuf_read_amount(&trxr_rbuf[0]),
rbuf_read_amount(&trxw_rbuf[0]),
rbuf_read_amount(&tx_rbuf),
tx_rbuf.write_index,
tx_rbuf.read_index,
min_count_trx);
} }
static void log_exit(const char * section, const char * msg, ...) { static void log_exit(const char * section, const char * msg, ...) {
...@@ -286,6 +383,7 @@ static void log_exit(const char * section, const char * msg, ...) { ...@@ -286,6 +383,7 @@ static void log_exit(const char * section, const char * msg, ...) {
// Dump useful information // Dump useful information
print_stats(stderr, 1); print_stats(stderr, 1);
fprintf(stderr, "TX RBUF: ri %d wi %d ra %d wa %d\n", tx_rbuf.read_index, tx_rbuf.write_index, rbuf_read_amount(&tx_rbuf), rbuf_write_amount(&tx_rbuf)); fprintf(stderr, "TX RBUF: ri %d wi %d ra %d wa %d\n", tx_rbuf.read_index, tx_rbuf.write_index, rbuf_read_amount(&tx_rbuf), rbuf_write_amount(&tx_rbuf));
fprintf(stderr, "RX RBUF: ri %d wi %d ra %d wa %d\n", rx_rbuf.read_index, rx_rbuf.write_index, rbuf_read_amount(&rx_rbuf), rbuf_write_amount(&rx_rbuf)); fprintf(stderr, "RX RBUF: ri %d wi %d ra %d wa %d\n", rx_rbuf.read_index, rx_rbuf.write_index, rbuf_read_amount(&rx_rbuf), rbuf_write_amount(&rx_rbuf));
fprintf(stderr, "TRXW RBUF: ri %d wi %d ra %d wa %d\n", trxw_rbuf[0].read_index, trxw_rbuf[0].write_index, rbuf_read_amount(&trxw_rbuf[0]), rbuf_write_amount(&trxw_rbuf[0])); fprintf(stderr, "TRXW RBUF: ri %d wi %d ra %d wa %d\n", trxw_rbuf[0].read_index, trxw_rbuf[0].write_index, rbuf_read_amount(&trxw_rbuf[0]), rbuf_write_amount(&trxw_rbuf[0]));
fprintf(stderr, "TRXR RBUF: ri %d wi %d ra %d wa %d\n", trxr_rbuf[0].read_index, trxr_rbuf[0].write_index, rbuf_read_amount(&trxr_rbuf[0]), rbuf_write_amount(&trxr_rbuf[0])); fprintf(stderr, "TRXR RBUF: ri %d wi %d ra %d wa %d\n", trxr_rbuf[0].read_index, trxr_rbuf[0].write_index, rbuf_read_amount(&trxr_rbuf[0]), rbuf_write_amount(&trxr_rbuf[0]));
...@@ -538,6 +636,18 @@ static void *recv_thread(void *p) { ...@@ -538,6 +636,18 @@ static void *recv_thread(void *p) {
printf("seq_id = %d\n", seq_id); printf("seq_id = %d\n", seq_id);
first_seq_id = 0; first_seq_id = 0;
} }
if(s->master == 1) {
uint16_t _rx_seq_id;
uint16_t seq_id = htons(((uint16_t *) (rtebuf + 20))[0]);
rx_seq_id++;
_rx_seq_id = rx_seq_id % 65536;
if(_rx_seq_id != seq_id) {
if(_rx_seq_id > seq_id)
rx_seq_id += 65536 + seq_id - _rx_seq_id;
else
rx_seq_id += seq_id - _rx_seq_id;
}
}
memcpy(buf + i * rx_rbuf.len, rtebuf, rx_rbuf.len); memcpy(buf + i * rx_rbuf.len, rtebuf, rx_rbuf.len);
#else #else
...@@ -600,6 +710,96 @@ static void *send_thread(void *p) { ...@@ -600,6 +710,96 @@ static void *send_thread(void *p) {
pthread_exit(EXIT_SUCCESS); pthread_exit(EXIT_SUCCESS);
} }
static void encode_empty_frames(int n, int tx_n_channel, int trx, int tdd_period) {
uint8_t * buf = RBUF_WRITE0(tx_rbuf, uint8_t) + 8;
int n2 = n;
for(int i = 0; i < n; i++) {
//if(trx && !((trx_encode_counter.counter + i) % tdd_period)) {
// *(buf - 7) = 3;
// *((uint16_t *)(buf - 2)) = htons(tx_seq_id++);
// buf += tx_rbuf.len;
// n2++;
//}
memset(buf, 0x00, 60 * tx_n_channel);
*((uint16_t *)(buf - 2)) = htons(tx_seq_id++);
buf += tx_rbuf.len;
}
tx_rbuf.write_index = (tx_rbuf.write_index + n2) % tx_rbuf.buf_len;
if(trx)
trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + n) % trxw_rbuf[0].buf_len;
}
static void encode_trx_frames(int n, int tx_n_channel, int tdd_period) {
int nc;
int nf = n;
while((nc = rbuf_contiguous_copy(&trxw_rbuf[0], &tx_rbuf, nf))) {
Complex * iq_samples[4];
uint8_t * buf = RBUF_WRITE0(tx_rbuf, uint8_t) + 8;
int nc2 = nc;
for(int j = 0; j < tx_n_channel; j++)
iq_samples[j] = ((Complex *) trxw_rbuf[j].buffer) + (trxw_rbuf[0].read_index * trxw_rbuf[0].len);
for(int i = 0; i < nc; i++) {
//if(!((trx_encode_counter.counter + i) % tdd_period)) {
// *(buf - 7) = 3;
// *((uint16_t *)(buf - 2)) = htons(tx_seq_id++);
// buf += tx_rbuf.len;
// nc2++;
//}
for(int i = 0; i < tx_n_channel ; i++)
encode_s64_b60_2(buf + i * 60, (float *) iq_samples[i]);
*((uint16_t *)(buf - 2)) = htons(tx_seq_id++);
for(int j = 0; j < tx_n_channel; j++)
iq_samples[j] += trxw_rbuf[0].len;
buf += tx_rbuf.len;
}
tx_rbuf.write_index = (tx_rbuf.write_index + nc2) % tx_rbuf.buf_len;
trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + nc) % trxw_rbuf[0].buf_len;
nf -= nc;
}
if(nf)
exit(EXIT_FAILURE);
}
int read_trx(int n_max, TRXEcpriState * s) {
int remain = n_max;
while(remain && rbuf_read_amount(&trxw_rbuf[0]) && rbuf_read_amount(&trxw_group_rbuf)) {
sample_group_t * g;
int n_trx;
g = RBUF_READ0(trxw_group_rbuf, sample_group_t);
if(g->wait) {
g->wait = 0;
g->count -= trx_encode_counter.counter;
g->zeroes = 1;
}
n_trx = g->count > remain ? remain : g->count;
g->count -= n_trx;
if(s->trace_tx) {
if(sync_complete && (encode_counter.counter + n_trx) >= (tx_rbuf.buf_len + s->trace_offset)) {
tx_trace_ready = 1;
log_info("ENCODE_THREAD", "TX Trace ready");
pthread_exit(EXIT_SUCCESS);
} else if (tx_trace_ready) {
pthread_exit(EXIT_SUCCESS);
}
}
if(g->zeroes)
encode_empty_frames(n_trx, s->tx_n_channel, 1, s->tdd_period);
else
encode_trx_frames(n_trx, s->tx_n_channel, s->tdd_period);
if(!g->count) {
rbuf_update_read_index(&trxw_group_rbuf);
}
update_counter(&trx_encode_counter, n_trx);
remain -= n_trx;
}
return (n_max - remain);
}
/* /*
If sync has happenned (=we have received frames): If sync has happenned (=we have received frames):
Prepare as soon as TRX has packet to write Prepare as soon as TRX has packet to write
...@@ -608,13 +808,14 @@ Else: ...@@ -608,13 +808,14 @@ Else:
Prepare as soon as there is space in tx buffer Prepare as soon as there is space in tx buffer
*/ */
#define TX_SYNC_BURST_SIZE 512 #define TX_SYNC_BURST_SIZE 512
#define MASTER_BURST 1024
static void *encode_thread(void *p) { static void *encode_thread(void *p) {
cpu_set_t mask; cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
int64_t target_counter = 0; int64_t target_counter = 0;
struct timespec next; struct timespec next;
int reset_encode_counter = 1; int64_t initial_ts = 0;
// Set thread CPU affinity // Set thread CPU affinity
CPU_ZERO(&mask); CPU_ZERO(&mask);
...@@ -623,100 +824,39 @@ static void *encode_thread(void *p) { ...@@ -623,100 +824,39 @@ static void *encode_thread(void *p) {
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->encode_affinity); error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->encode_affinity);
for(int64_t i = 0;; i++) { for(int64_t i = 0;; i++) {
int n; int n, n_trx;
int n_empty = 0;
n = rbuf_write_amount(&tx_rbuf); int n_min, n_max;
// Send empty frames until we receive something if(s->master && !i) {
if(s->start_sending) {
if(!sync_complete) {
if(i == 0)
clock_gettime(CLOCK_TAI, &next); clock_gettime(CLOCK_TAI, &next);
// Limit packets sent initial_ts = ts_to_int(next);
if(encode_counter.counter > target_counter) { add_ns(&next, (((int64_t) MASTER_BURST) * NSEC_PER_SEC) / s->frame_frequency);
int k = (encode_counter.counter - target_counter + (s->frame_frequency / 100) - 1) / (s->frame_frequency / 100);
add_ns(&next, k * 1000 * 1000 * 10); // 10ms to send 38400 packets
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
target_counter += k * s->frame_frequency / 100;
}
n = (n > TX_SYNC_BURST_SIZE) ? n : TX_SYNC_BURST_SIZE;
n = (n < (s->frame_frequency / 100)) ? n : (s->frame_frequency / 100);
for(int j = 0; j < n; j++) {
*((uint16_t *) (RBUF_WRITE0(tx_rbuf, uint8_t) + 6)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
}
update_counter(&encode_counter, n);
} }
else if (reset_encode_counter) {
if(s->trace_tx)
encode_counter_prev = encode_counter.counter;
encode_counter.counter = 0;
reset_encode_counter = 0;
seq_id = 0;
}
}
// If we have frames to encode (is there space in TX buffer)
// If there are frames from trx_write callback to encode
if(n && rbuf_read_amount(&trxw_rbuf[0]) && rbuf_read_amount(&trxw_group_rbuf)) {
sample_group_t * g; int nb_frames;
g = RBUF_READ0(trxw_group_rbuf, sample_group_t);
if(g->wait) {
g->wait = 0;
g->count -= encode_counter.counter;
g->zeroes = 1;
}
nb_frames = g->count > n ? n : g->count;
g->count -= nb_frames;
if(s->trace_tx) {
if(sync_complete && (encode_counter.counter + nb_frames) >= (tx_rbuf.buf_len + s->trace_offset)) {
tx_trace_ready = 1;
log_info("ENCODE_THREAD", "TX Trace ready");
pthread_exit(EXIT_SUCCESS);
} else if (tx_trace_ready) {
pthread_exit(EXIT_SUCCESS);
}
}
if(g->zeroes) {
for(int j = 0; j < nb_frames; j++) {
memset(RBUF_WRITE0(tx_rbuf, uint8_t) + 8, 0x00, 240);
*((uint16_t *) (RBUF_WRITE0(tx_rbuf, uint8_t) + 6)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
}
trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + nb_frames) % trxw_rbuf[0].buf_len;
} else {
int nc;
int nf = nb_frames;
while((nc = rbuf_contiguous_copy(&trxw_rbuf[0], &tx_rbuf, nf))) {
Complex * iq_samples[4];
uint8_t * buf = RBUF_WRITE0(tx_rbuf, uint8_t) + 8;
for(int j = 0; j < s->tx_n_channel; j++)
iq_samples[j] = ((Complex *) trxw_rbuf[j].buffer) + (trxw_rbuf[0].read_index * trxw_rbuf[0].len);
for(int i = 0; i < nc; i++) {
for(int i = 0; i < s->tx_n_channel ; i++)
encode_s64_b60_2(buf + i * 60, (float *) iq_samples[i]);
*((uint16_t *)(buf - 2)) = htons(seq_id++);
for(int j = 0; j < s->tx_n_channel; j++)
iq_samples[j] += trxw_rbuf[0].len;
buf += tx_rbuf.len;
}
tx_rbuf.write_index = (tx_rbuf.write_index + nc) % tx_rbuf.buf_len;
trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + nc) % trxw_rbuf[0].buf_len;
nf -= nc;
}
if(nf)
exit(EXIT_FAILURE);
n = rbuf_write_amount(&tx_rbuf);
n_min = s->encode_burst;
n_min += s->generic_data_sync ? (s->encode_burst / s->tdd_period) + 1 : 0;
if(s->master && n < n_min)
log_exit("ENCODE_THREAD", "Not enough space in TX RBUF (%d < %d)\n", n, s->encode_burst);
n_max = s->encode_burst ? s->encode_burst : n;
n_trx = read_trx(n_max, s);
if(s->master) {
struct timespec current;
n_empty = s->encode_burst - n_trx;
encode_empty_frames(n_empty, s->tx_n_channel, 0, s->tdd_period);
target_counter += s->encode_burst;
next = int_to_ts(initial_ts + target_counter * NSEC_PER_SEC / s->frame_frequency);
do {
clock_gettime(CLOCK_TAI, &current);
} while(current.tv_sec < next.tv_sec || (current.tv_sec == next.tv_sec && current.tv_nsec < next.tv_nsec));
} }
update_counter(&encode_counter, nb_frames);
if(!g->count) { update_counter(&encode_counter, n_trx + n_empty);
rbuf_update_read_index(&trxw_group_rbuf); update_counter(&empty_encode_counter, n_empty);
}
}
} }
pthread_exit(EXIT_SUCCESS); pthread_exit(EXIT_SUCCESS);
} }
...@@ -850,7 +990,9 @@ static void *statistic_thread(void *p) { ...@@ -850,7 +990,9 @@ static void *statistic_thread(void *p) {
update_counter_pps(&read_counter); update_counter_pps(&read_counter);
update_counter_pps(&write_counter); update_counter_pps(&write_counter);
update_counter_pps(&encode_counter); update_counter_pps(&encode_counter);
update_counter_pps(&trx_encode_counter);
update_counter_pps(&sent_counter); update_counter_pps(&sent_counter);
update_counter_pps(&empty_encode_counter);
if(s->monitor_pps) { if(s->monitor_pps) {
if(recv_counter.pps > recv_pps_threshold) { if(recv_counter.pps > recv_pps_threshold) {
recv_pps_threshold_hit = 1; recv_pps_threshold_hit = 1;
...@@ -1020,7 +1162,7 @@ int startdpdk(TRXEcpriState * s) { ...@@ -1020,7 +1162,7 @@ int startdpdk(TRXEcpriState * s) {
//set_latency_target(); //set_latency_target();
seq_id = 0; tx_seq_id = 0;
init_counter(&rx_drop_counter); init_counter(&rx_drop_counter);
init_counter(&tx_drop_counter); init_counter(&tx_drop_counter);
init_counter(&recv_counter); init_counter(&recv_counter);
...@@ -1028,7 +1170,9 @@ int startdpdk(TRXEcpriState * s) { ...@@ -1028,7 +1170,9 @@ int startdpdk(TRXEcpriState * s) {
init_counter(&read_counter); init_counter(&read_counter);
init_counter(&write_counter); init_counter(&write_counter);
init_counter(&encode_counter); init_counter(&encode_counter);
init_counter(&trx_encode_counter);
init_counter(&sent_counter); init_counter(&sent_counter);
init_counter(&empty_encode_counter);
RBUF_INIT(rx_rbuf, "RX ring buffer", s->txrx_buf_size, RX_MAX_PACKET_SIZE, uint8_t); RBUF_INIT(rx_rbuf, "RX ring buffer", s->txrx_buf_size, RX_MAX_PACKET_SIZE, uint8_t);
RBUF_INIT(tx_rbuf, "TX ring buffer", s->txrx_buf_size, TX_ECPRI_PACKET_SIZE, uint8_t); RBUF_INIT(tx_rbuf, "TX ring buffer", s->txrx_buf_size, TX_ECPRI_PACKET_SIZE, uint8_t);
...@@ -1099,6 +1243,9 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void ...@@ -1099,6 +1243,9 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
write_count = count / M; write_count = count / M;
ts = timestamp / M; ts = timestamp / M;
if(count)
min_count_trx = count < min_count_trx ? count : min_count_trx;
log_debug("TRX_ECPRI_WRITE", "trx_ecpri_write, count = %ld", count); log_debug("TRX_ECPRI_WRITE", "trx_ecpri_write, count = %ld", count);
if(prev_count && ((ts - prev_ts) != prev_count)) { if(prev_count && ((ts - prev_ts) != prev_count)) {
...@@ -1171,6 +1318,7 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa ...@@ -1171,6 +1318,7 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa
for(int i = 0; i < s->rx_n_channel; i++ ) { for(int i = 0; i < s->rx_n_channel; i++ ) {
uint8_t * dst = (uint8_t*) (_samples[i] + offset); uint8_t * dst = (uint8_t*) (_samples[i] + offset);
uint8_t * src = ((uint8_t *) trxr_rbuf[i].buffer) + trxr_rbuf[0].read_index * trxr_rbuf[0].len * sizeof(Complex); uint8_t * src = ((uint8_t *) trxr_rbuf[i].buffer) + trxr_rbuf[0].read_index * trxr_rbuf[0].len * sizeof(Complex);
if(!s->trx_read_null)
memcpy(dst, src, len); memcpy(dst, src, len);
} }
trxr_rbuf[0].read_index = (trxr_rbuf[0].read_index + nc) % trxr_rbuf[0].buf_len; trxr_rbuf[0].read_index = (trxr_rbuf[0].read_index + nc) % trxr_rbuf[0].buf_len;
...@@ -1437,14 +1585,24 @@ int trx_driver_init(TRXState *s1) ...@@ -1437,14 +1585,24 @@ int trx_driver_init(TRXState *s1)
s->monitor_pps = (int) val; s->monitor_pps = (int) val;
trx_get_param_double(s1, &val, "monitor_trigger_duration"); trx_get_param_double(s1, &val, "monitor_trigger_duration");
s->monitor_trigger_duration = (int) val; s->monitor_trigger_duration = (int) val;
trx_get_param_double(s1, &val, "start_sending");
s->start_sending = (int) val;
trx_get_param_double(s1, &val, "start_receiving"); trx_get_param_double(s1, &val, "start_receiving");
s->start_receiving = (int) val; s->start_receiving = (int) val;
trx_get_param_double(s1, &val, "rx_n_channel"); trx_get_param_double(s1, &val, "rx_n_channel");
s->rx_n_channel = (int) val; s->rx_n_channel = (int) val;
trx_get_param_double(s1, &val, "tx_n_channel"); trx_get_param_double(s1, &val, "tx_n_channel");
s->tx_n_channel = (int) val; s->tx_n_channel = (int) val;
trx_get_param_double(s1, &val, "master");
s->master = (int) val;
trx_get_param_double(s1, &val, "generic_data_sync");
s->generic_data_sync = (int) val;
trx_get_param_double(s1, &val, "trx_read_null");
s->trx_read_null = (int) val;
trx_get_param_double(s1, &val, "tdd_period");
s->tdd_period = (int) val;
trx_get_param_double(s1, &val, "encode_burst");
s->encode_burst = (int) val;
trx_get_param_double(s1, &val, "send_burst");
s->send_burst = (int) val;
trx_get_param_double(s1, &val, "statistics_refresh_rate_ns"); trx_get_param_double(s1, &val, "statistics_refresh_rate_ns");
s->statistics_refresh_rate_ns = (int) val; s->statistics_refresh_rate_ns = (int) val;
trx_get_param_double(s1, &val, "ecpri_period"); trx_get_param_double(s1, &val, "ecpri_period");
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment