Commit f5377651 authored by Joanne Hugé's avatar Joanne Hugé

Finish rewriting recv_thread

parent a24758be
......@@ -305,6 +305,7 @@ static void update_counter(volatile counter_stat_t * c, int64_t v) {
c->counter += v;
}
#define RX_BURST_SIZE
static void *recv_thread(void *p) {
cpu_set_t mask;
......@@ -316,41 +317,44 @@ static void *recv_thread(void *p) {
CPU_ZERO(&mask);
CPU_SET(s->recv_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->recv_affinity);
error(EXIT_FAILURE, errno,
"Could not set CPU affinity to CPU %d\n", s->recv_affinity);
for(;;) {
struct mmsghdr msgh[4000];
struct iovec msgv[4000];
struct mmsghdr msgh[RX_BURST_SIZE];
struct iovec msgv[RX_BURST_SIZE];
memset(msgv, 0, sizeof(msgv));
memset(msgh, 0, sizeof(msgh));
for(int j = 0; j < ecpri_period_mult; j++) {
msgv[j].iov_base = RBUF_WRITE(rx_rbuf, uint8_t);
msgv[j].iov_len = rx_rbuf.len;
for(int j = 0; j < RX_BURST_SIZE; j++) {
msgv[j].iov_base = rbuf_write(&rx_rbuf, rx_rbuf.block_len);
msgv[j].iov_len = rx_rbuf.block_len;
msgh[j].msg_hdr.msg_iov = &msgv[j];
msgh[j].msg_hdr.msg_iovlen = 1;
rbuf_update_write_index(&rx_rbuf);
rbuf_update_write_index(&rx_rbuf, rx_rbuf.block_len);
}
ret = recvmmsg(recv_sockfd, msgh, ecpri_period_mult, 0, NULL);
ret = recvmmsg(recv_sockfd, msgh, RX_BURST_SIZE, 0, NULL);
if(ret == -1)
error(EXIT_FAILURE, errno, "recvmmsg error");
if(ret != ecpri_period_mult)
log_error("RECV_THREAD", "recvmmsg received %d messages instead of %d\n", ret, ecpri_period_mult);
if(ret != RX_BURST_SIZE)
log_error("RECV_THREAD",
"recvmmsg received %d messages instead of %d\n", ret, RX_BURST_SIZE);
update_counter(&recv_counter, RX_BURST_SIZE);
}
pthread_exit(EXIT_SUCCESS);
}
#define MAX_PACKET_BURST
#define TX_BURST_SIZE
// Send as soon as packets are encoded
static void *send_thread(void *p) {
cpu_set_t mask;
struct timespec initial;
struct mmsghdr msgh[MAX_PACKET_BURST];
struct iovec msgv[MAX_PACKET_BURST];
struct mmsghdr msgh[TX_BURST_SIZE];
struct iovec msgv[TX_BURST_SIZE];
TRXEcpriState * s = (TRXEcpriState *) p;
log_info("SEND_THREAD", "Thread init");
......@@ -363,7 +367,7 @@ static void *send_thread(void *p) {
memset(msgv, 0, sizeof(msgv));
memset(msgh, 0, sizeof(msgh));
for(int j = 0; j < MAX_PACKET_BURST; j++) {
for(int j = 0; j < TX_BURST_SIZE; j++) {
msgh[j].msg_hdr.msg_name = &connect_sk_addr;
msgh[j].msg_hdr.msg_namelen = sizeof(connect_sk_addr);
msgh[j].msg_hdr.msg_iov = &msgv[j];
......@@ -380,7 +384,7 @@ static void *send_thread(void *p) {
msgv[burst_size].iov_base = rbuf_read(&tx_rbuf, tx_rbuf.block_len);
msgv[burst_size].iov_len = tx_rbuf.block_len;
rbuf_increment_read(&tx_rbuf, tx_rbuf.block_len);
if(burst_size > MAX_PACKET_BURST)
if(burst_size > TX_BURST_SIZE)
log_exit("SEND_THREAD", "Too many burst packets");
to_send = rbuf_read_amount(&tx_rbuf);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment