Commit 631eafe2 authored by Titouan Soulard's avatar Titouan Soulard

libtrx: reintroduce queue for Recv

parent d32c6fa4
......@@ -11,7 +11,8 @@
#include "libcapulet/net_udp.h"
#define TRX_RDMA_MR_COUNT 32
#define TRX_RDMA_MR_SIZE 4096
#define TRX_RDMA_MR_SIZE 32768
#define TRX_RDMA_BUFFER_SIZE 131072
// XXX: hardcoded parameter
#define TRX_RDMA_CHANNEL_COUNT 2
......@@ -30,6 +31,8 @@ struct SDRContext {
struct CapuletRdmaIbContext ib_ctx;
struct CapuletNetUdpContext *udp_ctx;
struct CommonCBBuffer *recv_buffer;
trx_timestamp_t last_timestamp;
struct ibv_mr *recv_mrs[TRX_RDMA_MR_COUNT];
......
......@@ -59,8 +59,9 @@ int trx_rdma_start(TRXState *s, const TRXDriverParams2 *p) {
for(mr_id = 0; mr_id < TRX_RDMA_MR_COUNT; mr_id++) {
capulet_rdma_ib_post_recv(&sdr_context->ib_ctx, sdr_context->recv_mrs[mr_id], sizeof(struct SDRMemoryRegion), mr_id);
}
usleep(10000);
usleep(1000);
sdr_context->recv_buffer = common_circular_buffer_create(TRX_RDMA_BUFFER_SIZE * sizeof(TRXComplex), false);
return 0;
}
......@@ -138,6 +139,7 @@ int trx_rdma_read(TRXState *s, trx_timestamp_t *ptimestamp, void **psamples, int
int result;
uint32_t mr_id;
uint32_t byte_count;
uint16_t sample_count;
sdr_context = s->opaque;
......@@ -147,26 +149,35 @@ int trx_rdma_read(TRXState *s, trx_timestamp_t *ptimestamp, void **psamples, int
// Poll for Recv completion
result = ibv_poll_cq(sdr_context->ib_ctx.recv_cq, 1, &poll_wc);
if(result == 0) return 0;
// Handle errors
if(result < 0 || poll_wc.status != IBV_WC_SUCCESS) {
printf("(Recv) WR %lx failed: %s (%d)\n", poll_wc.wr_id, ibv_wc_status_str(poll_wc.status), poll_wc.vendor_err);
if(result < 0) {
printf("trx_rdma: failed to poll Recv CQ\n");
return 0;
}
mr_id = (uint32_t) (poll_wc.wr_id >> 32);
in_data = (struct SDRMemoryRegion *) sdr_context->recv_mrs[mr_id]->addr;
if(result > 0) {
if(poll_wc.status != IBV_WC_SUCCESS) {
printf("trx_rdma: Recv WR %lx failed: %s (%d)\n", poll_wc.wr_id, ibv_wc_status_str(poll_wc.status), poll_wc.vendor_err);
return 0;
}
mr_id = (uint32_t) (poll_wc.wr_id >> 32);
in_data = (struct SDRMemoryRegion *) sdr_context->recv_mrs[mr_id]->addr;
// The Recv queue should always be full for optimal performances: once
// an element is consumed, push back to it.
capulet_rdma_ib_post_recv(&sdr_context->ib_ctx, sdr_context->recv_mrs[mr_id], sizeof(struct SDRMemoryRegion), mr_id);
// The Recv queue should always be full for optimal performances: once
// an element is consumed, push back to it.
capulet_rdma_ib_post_recv(&sdr_context->ib_ctx, sdr_context->recv_mrs[mr_id], sizeof(struct SDRMemoryRegion), mr_id);
// Copy data from RDMA buffer to local buffer
sample_count = in_data->meta.sample_count;
// Push data onto a circular buffer
sample_count = in_data->meta.sample_count;
common_circular_buffer_write(sdr_context->recv_buffer, (void *) in_data->iq[0], sample_count * sizeof(TRXComplex));
}
memcpy((void *) channels[0], (void *) in_data->iq[0], sample_count * sizeof(TRXComplex));
memcpy((void *) channels[1], (void *) in_data->iq[1], sample_count * sizeof(TRXComplex));
// Copy data from circular buffer to local buffer
// XXX: handle multiple channels
byte_count = common_circular_buffer_read(sdr_context->recv_buffer, (void *) channels[0], count * sizeof(TRXComplex));
if(byte_count != 0) memcpy((void *) channels[1], (void *) channels[0], byte_count);
sample_count = (uint16_t) (byte_count / sizeof(TRXComplex));
// Increment timestamp by IQ samples count
sdr_context->last_timestamp += sample_count;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment