Commit d32c6fa4 authored by Titouan Soulard's avatar Titouan Soulard

libtrx: fill MR as much as possible

parent 70fad2cb
......@@ -36,6 +36,8 @@ struct SDRContext {
struct ibv_mr *send_mrs[TRX_RDMA_MR_COUNT];
uint64_t send_mrs_status;
int consumed_mr_id;
int consumed_mr_length;
char *server_addr;
char *device_name;
......
......@@ -73,31 +73,47 @@ void trx_rdma_write(TRXState *s, trx_timestamp_t timestamp, const void **samples
struct ibv_wc poll_wc[TRX_RDMA_MR_COUNT];
int mr_id;
int total_written;
int written;
int result;
int mr_id;
int i;
sdr_context = s->opaque;
channels = (TRXComplex **) samples;
total_written = 0;
// First write given bytes to a MR
while(total_written < count) {
// Find a new MR if none was already filled in
if(sdr_context->consumed_mr_id == -1) {
// First set bit (lowest first) corresponds to the first MR available
// Make sure the bit can be found (ie. chack range) to avoid UB
if(sdr_context->send_mrs_status > 0 && sdr_context->send_mrs_status < ((uint64_t) 1 << TRX_RDMA_MR_COUNT)) {
sdr_context->consumed_mr_id = __builtin_ctzl(sdr_context->send_mrs_status);
sdr_context->consumed_mr_length = 0;
sdr_context->send_mrs_status &= ~((uint64_t) 1 << sdr_context->consumed_mr_id);
} else break;
}
if(count <= 0) return;
// Send samples to remote host if an MR is available
if(sdr_context->send_mrs_status > 0 && sdr_context->send_mrs_status < ((uint64_t) 1 << TRX_RDMA_MR_COUNT)) {
// Find first set bit (lowest first): corresponds to the first MR available
mr_id = __builtin_ctzl(sdr_context->send_mrs_status);
out_data = (struct SDRMemoryRegion *) sdr_context->send_mrs[mr_id]->addr;
// Write as many samples as possible to the MR
written = MIN(count - total_written, TRX_RDMA_MR_SIZE - sdr_context->consumed_mr_length);
out_data = (struct SDRMemoryRegion *) sdr_context->send_mrs[sdr_context->consumed_mr_id]->addr;
memcpy((void *) out_data->iq[0], (void *) channels[0], count * sizeof(TRXComplex));
memcpy((void *) out_data->iq[1], (void *) channels[1], count * sizeof(TRXComplex));
out_data->meta.sample_count = count;
memcpy((void *) (out_data->iq[0] + sdr_context->consumed_mr_length), (void *) (channels[0] + total_written), written * sizeof(TRXComplex));
memcpy((void *) (out_data->iq[1] + sdr_context->consumed_mr_length), (void *) (channels[1] + total_written), written * sizeof(TRXComplex));
capulet_rdma_ib_post_send(&sdr_context->ib_ctx, IBV_WR_SEND, sdr_context->send_mrs[mr_id], sizeof(struct SDRMemoryRegion), mr_id, NULL);
sdr_context->send_mrs_status &= ~((uint64_t) 1 << mr_id);
total_written += written;
sdr_context->consumed_mr_length += written;
// When an MR is full, send it, next iteration will find another one
if(sdr_context->consumed_mr_length == TRX_RDMA_MR_SIZE) {
out_data->meta.sample_count = TRX_RDMA_MR_SIZE;
capulet_rdma_ib_post_send(&sdr_context->ib_ctx, IBV_WR_SEND, sdr_context->send_mrs[sdr_context->consumed_mr_id], sizeof(struct SDRMemoryRegion), sdr_context->consumed_mr_id, NULL);
sdr_context->consumed_mr_id = -1;
}
}
// Check for acknowledge to allow sending new frames
// Regardless of data written, poll for completion of Send requests
result = ibv_poll_cq(sdr_context->ib_ctx.send_cq, TRX_RDMA_MR_COUNT, poll_wc);
if(result <= 0) return;
......@@ -106,6 +122,7 @@ void trx_rdma_write(TRXState *s, trx_timestamp_t timestamp, const void **samples
printf("(Send) WR %lx failed: %s (%d)\n", poll_wc[i].wr_id, ibv_wc_status_str(poll_wc[i].status), poll_wc[i].vendor_err);
}
// MR id is encoded in the WR id prefix (32 first bits over 64 total)
mr_id = (uint32_t) (poll_wc[i].wr_id >> 32);
sdr_context->send_mrs_status |= (uint64_t) 1 << mr_id;
}
......@@ -193,6 +210,7 @@ int trx_driver_init(TRXState *s) {
sdr_context->device_name = trx_get_param_string(s, "device_name");
sdr_context->last_timestamp = 0;
sdr_context->send_mrs_status = ((uint64_t) 1 << TRX_RDMA_MR_COUNT) - 1;
sdr_context->consumed_mr_id = -1;
if(!sdr_context->device_name) {
fprintf(stderr, "Missing `device_name` parameter\n");
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment