Commit a24758be authored by Joanne Hugé's avatar Joanne Hugé

Finish rewriting send_thread

parent 14b45dba
......@@ -343,88 +343,56 @@ static void *recv_thread(void *p) {
pthread_exit(EXIT_SUCCESS);
}
#define MAX_PACKET_BURST
// Send as soon as packets are encoded
static void *send_thread(void *p) {
cpu_set_t mask;
struct timespec initial, next;
struct timespec t1[4000];
struct timespec t2[4000];
int k = 0;
cpu_set_t mask;
struct timespec initial;
struct mmsghdr msgh[MAX_PACKET_BURST];
struct iovec msgv[MAX_PACKET_BURST];
TRXEcpriState * s = (TRXEcpriState *) p;
struct mmsghdr msgh[4000];
struct iovec msgv[4000];
log_info("SEND_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->send_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->send_affinity);
log_info("SEND_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->send_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno,
"Could not set CPU affinity to CPU %d\n", s->send_affinity);
memset(msgv, 0, sizeof(msgv));
memset(msgh, 0, sizeof(msgh));
for(int j = 0; j < ecpri_period_mult; j++) {
for(int j = 0; j < MAX_PACKET_BURST; j++) {
msgh[j].msg_hdr.msg_name = &connect_sk_addr;
msgh[j].msg_hdr.msg_namelen = sizeof(connect_sk_addr);
msgh[j].msg_hdr.msg_iov = &msgv[j];
msgh[j].msg_hdr.msg_iovlen = 1;
}
pthread_mutex_lock(&tx_ready_mutex);
pthread_cond_wait(&tx_ready_cond, &tx_ready_mutex);
pthread_mutex_unlock(&tx_ready_mutex);
clock_gettime(CLOCK_TAI, &initial);
for(int64_t i = 1;; i++) {
int ret, msg_sent;
#ifdef DEBUG
if(i > SEND_LIMIT) {
int64_t d, dt;
clock_gettime(CLOCK_TAI, &next);
d = calcdiff_ns(next, initial);
for(int j = 0; j < k; j++) {
dt = calcdiff_ns(t2[j], t1[j]);
log_debug("SEND_THREAD", "%" PRIi64, dt);
}
log_debug("SEND_THREAD", "Packets sent: %" PRIi64, sent_frame_count);
log_debug("SEND_THREAD", "Duration: %" PRIi64, d);
log_debug("SEND_THREAD", "ecpri_period_mult: %" PRIi64, ecpri_period_mult);
log_debug("SEND_THREAD", "FRAME_FREQ: %" PRIi64, FRAME_FREQ);
exit(EXIT_SUCCESS);
}
#endif
for(int64_t i = 1;; i++) {
next = initial;
// Multiply by i everytime to prevent any frequence drift
add_ns(&next, (ecpri_period_mult * NSEC_PER_SEC * i) / FRAME_FREQ);
int burst_size;
int to_send = rbuf_read_amount(&tx_rbuf);
for(int j = 0; j < ecpri_period_mult; j++) {
msgv[j].iov_base = RBUF_READ(tx_rbuf, uint8_t);
msgv[j].iov_len = tx_rbuf.len;
rbuf_update_read_index(&tx_rbuf);
for(burst_size ; to_send ; burst_size++) {
msgv[burst_size].iov_base = rbuf_read(&tx_rbuf, tx_rbuf.block_len);
msgv[burst_size].iov_len = tx_rbuf.block_len;
rbuf_increment_read(&tx_rbuf, tx_rbuf.block_len);
if(burst_size > MAX_PACKET_BURST)
log_exit("SEND_THREAD", "Too many burst packets");
to_send = rbuf_read_amount(&tx_rbuf);
}
for(msg_sent = 0; msg_sent < ecpri_period_mult;) {
#ifdef DEBUG
clock_gettime(CLOCK_TAI, &t1[k]);
#endif
ret = sendmmsg(send_sockfd, msgh + msg_sent, (ecpri_period_mult - msg_sent), 0);
#ifdef DEBUG
clock_gettime(CLOCK_TAI, &t2[k++]);
#endif
for(int msg_sent = 0; msg_sent < burst_size;) {
ret = sendmmsg(send_sockfd, msgh + msg_sent, (burst_size - msg_sent), 0);
if(ret <= 0)
error(EXIT_FAILURE, errno, "sendmmsg error (returned %d)", ret);
msg_sent += ret;
sent_frame_count += ret;
update_counter(&sent_counter, ret);
}
pthread_mutex_lock(&tx_mutex);
pthread_cond_signal(&tx_cond);
pthread_mutex_unlock(&tx_mutex);
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
}
pthread_exit(EXIT_SUCCESS);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment