Commit c6b05feb authored by Titouan Soulard's avatar Titouan Soulard

example: add a stress test

parent c7de92cc
...@@ -10,6 +10,9 @@ ...@@ -10,6 +10,9 @@
#include "libcapulet/net_udp.h" #include "libcapulet/net_udp.h"
#include "libcapulet/rdma_ib.h" #include "libcapulet/rdma_ib.h"
#define RDMA_TEST_FILE 1
#define RDMA_TEST_STRESS 2
struct ServeMrThreadContext { struct ServeMrThreadContext {
struct CommonHashtableTable *mr_mgr; struct CommonHashtableTable *mr_mgr;
int server_socket; int server_socket;
...@@ -28,29 +31,34 @@ int main(int argc, char *argv[]) { ...@@ -28,29 +31,34 @@ int main(int argc, char *argv[]) {
struct CapuletNetUdpMrInfoPacket out_info_packet; struct CapuletNetUdpMrInfoPacket out_info_packet;
struct CommonHashtableTable mr_mgr; struct CommonHashtableTable mr_mgr;
struct ServeMrThreadContext serve_mr_td_ctx;
struct ibv_mr *in_mr; struct ibv_mr *in_mr;
struct ibv_mr *out_mr; struct ibv_mr *out_mr;
struct ibv_wc poll_wc; struct ibv_wc poll_wc;
struct addrinfo server_hints; struct addrinfo server_hints;
struct addrinfo *server_infos; struct addrinfo *server_infos;
struct timespec timestamps[2];
pthread_t serve_mr_td; pthread_t serve_mr_td;
struct ServeMrThreadContext serve_mr_td_ctx;
size_t remote_host_size; size_t remote_host_size;
size_t device_name_size; size_t device_name_size;
long elapsed_time;
long max_time;
int allocated_size; int allocated_size;
int server_socket; int server_socket;
int opt; int opt;
char *remote_host; char *remote_host;
char *device_name; char *device_name;
char mode;
char in_mr_fc; char in_mr_fc;
bool is_client; bool is_client;
bool result; bool result;
// Preallocate some variables // Preallocate some variables
allocated_size = 16384 * sizeof(char); allocated_size = 16384 * sizeof(char);
max_time = 0;
remote_host = NULL; remote_host = NULL;
device_name = NULL; device_name = NULL;
is_client = false; is_client = false;
...@@ -65,6 +73,15 @@ int main(int argc, char *argv[]) { ...@@ -65,6 +73,15 @@ int main(int argc, char *argv[]) {
/****************************** /******************************
**** Arguments processing **** **** Arguments processing ****
******************************/ ******************************/
if(strcmp(argv[1], "file") == 0) {
mode = RDMA_TEST_FILE;
} else if(strcmp(argv[1], "stress") == 0) {
mode = RDMA_TEST_STRESS;
} else {
fprintf(stderr, "Invalid command, allowed modes are: file and stress\n");
return -1;
}
while((opt = getopt(argc, argv, "c:d:")) != -1) { while((opt = getopt(argc, argv, "c:d:")) != -1) {
// Detect if a server was given; if any, act as a client // Detect if a server was given; if any, act as a client
if(opt == 'c') { if(opt == 'c') {
...@@ -106,7 +123,6 @@ int main(int argc, char *argv[]) { ...@@ -106,7 +123,6 @@ int main(int argc, char *argv[]) {
return -1; return -1;
} }
// Allocate memory and get user data from STDIN
in_mr = capulet_rdma_ib_create_mr(&rdma_ctx, allocated_size, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); in_mr = capulet_rdma_ib_create_mr(&rdma_ctx, allocated_size, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
out_mr = capulet_rdma_ib_create_mr(&rdma_ctx, allocated_size, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ); out_mr = capulet_rdma_ib_create_mr(&rdma_ctx, allocated_size, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ);
...@@ -127,29 +143,12 @@ int main(int argc, char *argv[]) { ...@@ -127,29 +143,12 @@ int main(int argc, char *argv[]) {
return -1; return -1;
} }
if(is_client) {
read(STDIN_FILENO, (char *) in_mr->addr, allocated_size);
} else {
read(STDIN_FILENO, (char *) out_mr->addr, allocated_size);
}
result = capulet_rdma_ib_initialize_qp(&rdma_ctx, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE); result = capulet_rdma_ib_initialize_qp(&rdma_ctx, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE);
if(!result) { if(!result) {
fprintf(stderr, "Queue Pair initialization failed\n"); fprintf(stderr, "Queue Pair initialization failed\n");
return -1; return -1;
} }
result = capulet_rdma_ib_post_recv(&rdma_ctx, in_mr, allocated_size);
if(!result) {
fprintf(stderr, "Posting Recv (in) failed\n");
return -1;
}
result = capulet_rdma_ib_post_recv(&rdma_ctx, out_mr, allocated_size);
if(!result) {
fprintf(stderr, "Posting Recv (out) failed\n");
return -1;
}
/****************************** /******************************
**** Exchange informations *** **** Exchange informations ***
...@@ -181,17 +180,35 @@ int main(int argc, char *argv[]) { ...@@ -181,17 +180,35 @@ int main(int argc, char *argv[]) {
capulet_net_udp_dump_qp_packet(udp_ctx->remote); capulet_net_udp_dump_qp_packet(udp_ctx->remote);
result = capulet_rdma_ib_set_peer_from_udp(&rdma_ctx, udp_ctx);
if(!result) {
fprintf(stderr, "Setting state to RTS failed\n");
return -1;
}
if(mode == RDMA_TEST_FILE) {
/****************************** /******************************
******** Read or wait ******** ******** Read or wait ********
******************************/ ******************************/
result = capulet_rdma_ib_set_peer_from_udp(&rdma_ctx, udp_ctx); if(is_client) {
read(STDIN_FILENO, (char *) in_mr->addr, allocated_size);
} else {
read(STDIN_FILENO, (char *) out_mr->addr, allocated_size);
}
result = capulet_rdma_ib_post_recv(&rdma_ctx, in_mr, allocated_size);
if(!result) { if(!result) {
fprintf(stderr, "Setting state to RTS failed\n"); fprintf(stderr, "Posting Recv (in) failed\n");
return -1;
}
result = capulet_rdma_ib_post_recv(&rdma_ctx, out_mr, allocated_size);
if(!result) {
fprintf(stderr, "Posting Recv (out) failed\n");
return -1; return -1;
} }
if(!is_client) { if(!is_client) {
// XXX: check ownership
serve_mr_td_ctx.mr_mgr = &mr_mgr; serve_mr_td_ctx.mr_mgr = &mr_mgr;
serve_mr_td_ctx.server_socket = server_socket; serve_mr_td_ctx.server_socket = server_socket;
...@@ -227,7 +244,7 @@ int main(int argc, char *argv[]) { ...@@ -227,7 +244,7 @@ int main(int argc, char *argv[]) {
******************************/ ******************************/
if(is_client) { if(is_client) {
do { do {
result = ibv_poll_cq(rdma_ctx.recv_cq, 1, &poll_wc); result = ibv_poll_cq(rdma_ctx.send_cq, 1, &poll_wc);
usleep(100); usleep(100);
} while(result == 0); } while(result == 0);
...@@ -249,6 +266,51 @@ int main(int argc, char *argv[]) { ...@@ -249,6 +266,51 @@ int main(int argc, char *argv[]) {
printf("%s\n", (char *) in_mr->addr); printf("%s\n", (char *) in_mr->addr);
} }
} else if(mode == RDMA_TEST_STRESS) {
// Fill the Recv queue
for(int i = 0; i < 16; i++) {
capulet_rdma_ib_post_recv(&rdma_ctx, in_mr, allocated_size);
}
// A small delay is needed to avoid sending anything before Recv requests
// are posted on the other side.
usleep(1000);
while(1) {
clock_gettime(CLOCK_MONOTONIC, &timestamps[0]);
// Wait for Send
capulet_rdma_ib_post_send(&rdma_ctx, IBV_WR_SEND, out_mr, allocated_size, NULL);
do {
result = ibv_poll_cq(rdma_ctx.send_cq, 1, &poll_wc);
} while(result == 0);
if(result < 0 || poll_wc.status != IBV_WC_SUCCESS) {
printf("WR %lu failed: %s (%d)\n", poll_wc.wr_id, ibv_wc_status_str(poll_wc.status), poll_wc.vendor_err);
return -1;
}
// Wait for Recv
do {
result = ibv_poll_cq(rdma_ctx.recv_cq, 1, &poll_wc);
} while(result == 0);
if(result < 0 || poll_wc.status != IBV_WC_SUCCESS) {
printf("WR %lu failed: %s (%d)\n", poll_wc.wr_id, ibv_wc_status_str(poll_wc.status), poll_wc.vendor_err);
return -1;
}
// The Recv queue should always be full for optimal performances: once
// an element is consumed, push back to it.
capulet_rdma_ib_post_recv(&rdma_ctx, in_mr, allocated_size);
clock_gettime(CLOCK_MONOTONIC, &timestamps[1]);
elapsed_time = (timestamps[1].tv_sec - timestamps[0].tv_sec) * 1000000000 + timestamps[1].tv_nsec - timestamps[0].tv_nsec;
if(elapsed_time > max_time) max_time = elapsed_time;
printf("\r%8lu / %8lu", elapsed_time, max_time);
fflush(stdout);
}
}
/****************************** /******************************
******* Global cleanup ******* ******* Global cleanup *******
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment