Commit 7eb75d32 authored by Titouan Soulard's avatar Titouan Soulard

libtrx: switch to new TRX API functions

parent f1384977
#include "libtrx/trx_rdma.h" #include "libtrx/trx_rdma.h"
// XXX: global variable to be removed
trx_timestamp_t global_timestamp = 0;
void *serve_mr_td_fn(void *raw_ctx) { void *serve_mr_td_fn(void *raw_ctx) {
struct SDRServeMrThreadContext *ctx = (struct SDRServeMrThreadContext *) raw_ctx; struct SDRServeMrThreadContext *ctx = (struct SDRServeMrThreadContext *) raw_ctx;
capulet_net_udp_serve_mr(ctx->mr_mgr, ctx->server_socket); capulet_net_udp_serve_mr(ctx->mr_mgr, ctx->server_socket);
...@@ -7,7 +10,7 @@ void *serve_mr_td_fn(void *raw_ctx) { ...@@ -7,7 +10,7 @@ void *serve_mr_td_fn(void *raw_ctx) {
return raw_ctx; return raw_ctx;
} }
static int trx_rdma_start(TRXState *s, const TRXDriverParams *p) { int trx_rdma_start(TRXState *s, const TRXDriverParams2 *p) {
struct SDRContext *sdr_context; struct SDRContext *sdr_context;
struct SDRServeMrThreadContext serve_mr_td_ctx; struct SDRServeMrThreadContext serve_mr_td_ctx;
...@@ -83,24 +86,24 @@ static int trx_rdma_start(TRXState *s, const TRXDriverParams *p) { ...@@ -83,24 +86,24 @@ static int trx_rdma_start(TRXState *s, const TRXDriverParams *p) {
if(!result) return -1; if(!result) return -1;
if(sdr_context->server_addr) { if(sdr_context->server_addr) {
capulet_net_udp_query_mr(server_socket, server_infos, "in", &sdr_context->in_remote);
if(!result || (strcmp(sdr_context->in_remote.name, "in") != 0)) return -1;
capulet_net_udp_query_mr(server_socket, server_infos, "out", &sdr_context->out_remote);
if(!result || (strcmp(sdr_context->out_remote.name, "out") != 0)) return -1;
} else {
// XXX: check ownership // XXX: check ownership
serve_mr_td_ctx.mr_mgr = &sdr_context->mr_mgr; serve_mr_td_ctx.mr_mgr = &sdr_context->mr_mgr;
serve_mr_td_ctx.server_socket = server_socket; serve_mr_td_ctx.server_socket = server_socket;
pthread_create(&serve_mr_td, NULL, *serve_mr_td_fn, (void *) &serve_mr_td_ctx); pthread_create(&serve_mr_td, NULL, *serve_mr_td_fn, (void *) &serve_mr_td_ctx);
capulet_net_udp_serve_mr(&sdr_context->mr_mgr, server_socket); capulet_net_udp_serve_mr(&sdr_context->mr_mgr, server_socket);
} else {
capulet_net_udp_query_mr(server_socket, server_infos, "in", &sdr_context->in_remote);
if(!result || (strcmp(sdr_context->in_remote.name, "in") != 0)) return -1;
capulet_net_udp_query_mr(server_socket, server_infos, "out", &sdr_context->out_remote);
if(!result || (strcmp(sdr_context->out_remote.name, "out") != 0)) return -1;
} }
freeaddrinfo(server_infos); freeaddrinfo(server_infos);
return 0; return 0;
} }
static void trx_rdma_write(TRXState *s, trx_timestamp_t timestamp, const void **samples, int count, int flags, int rf_port_index) { void trx_rdma_write(TRXState *s, trx_timestamp_t timestamp, const void **samples, int count, int tx_port_index, TRXWriteMetadata *md) {
struct SDRContext *sdr_context; struct SDRContext *sdr_context;
struct SDRMemoryRegion *out_local; struct SDRMemoryRegion *out_local;
struct CommonHashtableElement *out_local_mr_el; struct CommonHashtableElement *out_local_mr_el;
...@@ -124,7 +127,7 @@ static void trx_rdma_write(TRXState *s, trx_timestamp_t timestamp, const void ** ...@@ -124,7 +127,7 @@ static void trx_rdma_write(TRXState *s, trx_timestamp_t timestamp, const void **
} }
} }
static int trx_rdma_read(TRXState *s, trx_timestamp_t *ptimestamp, void **psamples, int count, int rf_port) { int trx_rdma_read(TRXState *s, trx_timestamp_t *ptimestamp, void **psamples, int count, int rx_port_index, TRXReadMetadata *md) {
struct SDRContext *sdr_context; struct SDRContext *sdr_context;
struct SDRMemoryRegion *in_local; struct SDRMemoryRegion *in_local;
struct CommonHashtableElement *in_local_mr_el; struct CommonHashtableElement *in_local_mr_el;
...@@ -157,12 +160,11 @@ static int trx_rdma_read(TRXState *s, trx_timestamp_t *ptimestamp, void **psampl ...@@ -157,12 +160,11 @@ static int trx_rdma_read(TRXState *s, trx_timestamp_t *ptimestamp, void **psampl
mr_length = in_local->meta.mr_length; mr_length = in_local->meta.mr_length;
in_local->meta.mr_length = 0; in_local->meta.mr_length = 0;
return mr_length; // Increment timestamp
} global_timestamp += count;
*ptimestamp = global_timestamp * 32;
static int trx_rdma_get_sample_rate(TRXState *s, TRXFraction *psample_rate, int *psample_rate_num, int sample_rate_min) { return mr_length;
// -1 means not implemented
return -1;
} }
static void trx_rdma_end(TRXState *s) { static void trx_rdma_end(TRXState *s) {
...@@ -210,10 +212,9 @@ int trx_driver_init(TRXState *s) { ...@@ -210,10 +212,9 @@ int trx_driver_init(TRXState *s) {
} }
s->opaque = sdr_context; s->opaque = sdr_context;
s->trx_start_func = trx_rdma_start; s->trx_start_func2 = trx_rdma_start;
s->trx_write_func = trx_rdma_write; s->trx_write_func2 = trx_rdma_write;
s->trx_read_func = trx_rdma_read; s->trx_read_func2 = trx_rdma_read;
s->trx_get_sample_rate_func = trx_rdma_get_sample_rate;
s->trx_end_func = trx_rdma_end; s->trx_end_func = trx_rdma_end;
return 0; return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment