Commit 05e67288 authored by Titouan Soulard's avatar Titouan Soulard

libcapulet/net_udp: allow registering multiple MR

- Split the `bind` call into two different calls
- Client application is responsible for initializing server infos
- Use different UDP packets for QP and MR
parent 9998ee57
......@@ -13,9 +13,13 @@ int main(int argc, char *argv[]) {
struct CapuletRdmaIbContext rdma_ctx;
struct CapuletNetUdpContext *udp_ctx;
struct CapuletRdmaMrMgrElement *main_mr_el;
struct CapuletNetUdpMrInfoPacket mr_info_packet;
struct ibv_wc poll_wc;
struct addrinfo server_hints;
struct addrinfo *server_infos;
int server_socket;
char remote_host[16];
char *memory_char_buffer;
bool result;
......@@ -36,6 +40,18 @@ int main(int argc, char *argv[]) {
}
}
// Set up remote address
if(is_client) {
memset(&server_hints, 0, sizeof(struct addrinfo));
server_hints.ai_family = AF_INET;
server_hints.ai_socktype = SOCK_DGRAM;
if(getaddrinfo(remote_host, "7362", &server_hints, &server_infos) != 0) {
fprintf(stderr, "getaddrinfo error\n");
return false;
}
}
/******************************
***** RDMA initialization ****
******************************/
......@@ -79,26 +95,26 @@ int main(int argc, char *argv[]) {
return -1;
}
result = capulet_rdma_ib_fill_base_udp(&rdma_ctx, main_mr_el->mr, udp_ctx->local);
result = capulet_rdma_ib_fill_base_udp(&rdma_ctx, udp_ctx->local);
if(!result) {
fprintf(stderr, "Query port failed\n");
return -1;
}
capulet_net_udp_dump_packet(udp_ctx->local);
capulet_net_udp_dump_qp_packet(udp_ctx->local);
if(is_client) {
result = capulet_net_udp_connect(udp_ctx, remote_host);
server_socket = capulet_net_udp_send_qp(udp_ctx, server_infos);
} else {
result = capulet_net_udp_bind(udp_ctx);
server_socket = capulet_net_udp_serve_qp(udp_ctx);
}
if(!result) {
if(server_socket == -1) {
fprintf(stderr, "Connection to peer failed\n");
return -1;
}
capulet_net_udp_dump_packet(udp_ctx->remote);
capulet_net_udp_dump_qp_packet(udp_ctx->remote);
/******************************
******** Read or wait ********
......@@ -109,14 +125,27 @@ int main(int argc, char *argv[]) {
return -1;
}
if(is_client) {
result = capulet_rdma_ib_send_read(&rdma_ctx, udp_ctx->remote, main_mr_el->mr, allocated_size);
if(!result) {
fprintf(stderr, "Sending Read failed\n");
return -1;
}
if(!is_client) {
capulet_net_udp_serve_mr(server_socket);
// Reached only on error: serve_mr is looping forever
return -1;
}
result = capulet_net_udp_query_mr(server_socket, server_infos, "main", &mr_info_packet);
if(!result || (strcmp(mr_info_packet.name, "main") != 0)) {
fprintf(stderr, "Querying MR from server failed\n");
return -1;
}
result = capulet_rdma_ib_send_read(&rdma_ctx, &mr_info_packet, main_mr_el->mr, allocated_size);
if(!result) {
fprintf(stderr, "Sending Read failed\n");
return -1;
}
/******************************
****** Poll completion *******
******************************/
do {
result = ibv_poll_cq(rdma_ctx.cq, 1, &poll_wc);
} while(result == 0);
......@@ -133,7 +162,9 @@ int main(int argc, char *argv[]) {
******* Global cleanup *******
******************************/
capulet_net_udp_free(udp_ctx);
capulet_rdma_mr_mgr_free(main_mr_el);
capulet_rdma_ib_free(&rdma_ctx);
freeaddrinfo(server_infos);
return 0;
}
......
......@@ -10,24 +10,38 @@
#include <sys/types.h>
#include <sys/socket.h>
struct CapuletNetUdpPacket {
#define CAP_QP_INFO_PTYPE 0x01
#define CAP_MR_QUERY_PTYPE 0x10
#define CAP_MR_INFO_PTYPE 0x11
struct CapuletNetUdpQpInfoPacket {
uint16_t lid;
uint32_t qpn;
uint32_t psn;
uint8_t gid[16];
};
struct CapuletNetUdpMrQueryPacket {
char name[16];
};
struct CapuletNetUdpMrInfoPacket {
char name[16];
void *addr;
size_t size;
uint32_t key;
};
struct CapuletNetUdpContext {
struct CapuletNetUdpPacket *local;
struct CapuletNetUdpPacket *remote;
struct CapuletNetUdpQpInfoPacket *local;
struct CapuletNetUdpQpInfoPacket *remote;
};
struct CapuletNetUdpContext *capulet_net_udp_initialize();
bool capulet_net_udp_bind(struct CapuletNetUdpContext *ctx);
bool capulet_net_udp_connect(struct CapuletNetUdpContext *ctx, const char *hostname);
void capulet_net_udp_dump_packet(struct CapuletNetUdpPacket *packet);
int capulet_net_udp_serve_qp(struct CapuletNetUdpContext *ctx);
bool capulet_net_udp_serve_mr(int server_socket);
int capulet_net_udp_send_qp(struct CapuletNetUdpContext *ctx, struct addrinfo *server_infos);
bool capulet_net_udp_query_mr(int server_socket, struct addrinfo *server_infos, char *name, struct CapuletNetUdpMrInfoPacket *mr_info_packet);
void capulet_net_udp_dump_qp_packet(struct CapuletNetUdpQpInfoPacket *packet);
void capulet_net_udp_free(struct CapuletNetUdpContext *ctx);
......@@ -17,9 +17,9 @@ struct CapuletRdmaIbContext {
bool capulet_rdma_ib_initialize_device(struct CapuletRdmaIbContext *ctx, const char *device_name);
bool capulet_rdma_ib_initialize_qp(struct CapuletRdmaIbContext *ctx, int access);
bool capulet_rdma_ib_fill_base_udp(struct CapuletRdmaIbContext *ctx, struct ibv_mr *mr, struct CapuletNetUdpPacket *own);
bool capulet_rdma_ib_fill_base_udp(struct CapuletRdmaIbContext *ctx, struct CapuletNetUdpQpInfoPacket *own);
bool capulet_rdma_ib_set_peer_from_udp(struct CapuletRdmaIbContext *ctx, struct CapuletNetUdpContext *udp_ctx);
bool capulet_rdma_ib_post_recv(struct CapuletRdmaIbContext *ctx, struct ibv_mr *ibv_dev_mr, int mr_size);
bool capulet_rdma_ib_send_read(struct CapuletRdmaIbContext *ctx, struct CapuletNetUdpPacket *peer, struct ibv_mr *ibv_dev_mr, int mr_size);
bool capulet_rdma_ib_send_read(struct CapuletRdmaIbContext *ctx, struct CapuletNetUdpMrInfoPacket *remote_mr, struct ibv_mr *local_mr, int local_mr_size);
void capulet_rdma_ib_free(struct CapuletRdmaIbContext *ctx);
......@@ -12,4 +12,5 @@ struct CapuletRdmaMrMgrElement {
struct CapuletRdmaMrMgrElement *capulet_rdma_mr_mgr_register(struct CapuletRdmaIbContext *ib_ctx, char *name, size_t size, int flags);
struct CapuletRdmaMrMgrElement *capulet_rdma_mr_mgr_find(char *name);
void capulet_rdma_mr_mgr_free(struct CapuletRdmaMrMgrElement *el);
#include "include/net_udp.h"
#include "include/rdma_mr_mgr.h"
struct CapuletNetUdpContext *capulet_net_udp_initialize() {
struct CapuletNetUdpContext *ctx;
void *raw_ptr;
const size_t packet_size = sizeof(struct CapuletNetUdpPacket);
const size_t packet_size = sizeof(struct CapuletNetUdpQpInfoPacket);
const size_t ctx_size = sizeof(struct CapuletNetUdpContext);
const size_t total_size = 2 * packet_size + ctx_size;
......@@ -16,13 +17,13 @@ struct CapuletNetUdpContext *capulet_net_udp_initialize() {
memset(raw_ptr, 0, total_size);
ctx = (struct CapuletNetUdpContext *) raw_ptr;
ctx->local = (struct CapuletNetUdpPacket *) (raw_ptr + ctx_size);
ctx->remote = (struct CapuletNetUdpPacket *) (raw_ptr + ctx_size + packet_size);
ctx->local = (struct CapuletNetUdpQpInfoPacket *) (raw_ptr + ctx_size);
ctx->remote = (struct CapuletNetUdpQpInfoPacket *) (raw_ptr + ctx_size + packet_size);
return ctx;
}
bool capulet_net_udp_bind(struct CapuletNetUdpContext *ctx) {
int capulet_net_udp_serve_qp(struct CapuletNetUdpContext *ctx) {
struct addrinfo server_hints;
struct addrinfo *server_infos;
struct sockaddr client_addr;
......@@ -30,12 +31,10 @@ bool capulet_net_udp_bind(struct CapuletNetUdpContext *ctx) {
int result;
int server_socket;
const size_t packet_size = sizeof(struct CapuletNetUdpPacket);
const size_t ctx_size = sizeof(struct CapuletNetUdpContext);
const size_t total_size = 2 * packet_size + ctx_size;
size_t buffer_size = sizeof(struct CapuletNetUdpQpInfoPacket);
uint8_t receive_buffer[buffer_size];
socklen_t client_addr_size = sizeof(struct sockaddr_storage);
char receive_buffer[total_size];
// Set up bind address
memset(&server_hints, 0, sizeof(struct addrinfo));
......@@ -46,7 +45,7 @@ bool capulet_net_udp_bind(struct CapuletNetUdpContext *ctx) {
result = getaddrinfo(NULL, "7362", &server_hints, &server_infos);
if(result != 0) {
fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(result));
return false;
return -1;
}
// Create a socket an bind
......@@ -54,31 +53,53 @@ bool capulet_net_udp_bind(struct CapuletNetUdpContext *ctx) {
server_socket = socket(server_infos->ai_family, server_infos->ai_socktype, server_infos->ai_protocol);
if(server_socket == -1) {
perror("socket error");
return false;
return -1;
}
result = bind(server_socket, server_infos->ai_addr, server_infos->ai_addrlen);
if(result == -1) {
perror("bind error");
return false;
return -1;
}
// Address info can be freed as soon as the socket in bound
freeaddrinfo(server_infos);
// XXX: blocking call in an user function
result = recvfrom(server_socket, receive_buffer, total_size, 0, &client_addr, &client_addr_size);
if(result == -1) {
perror("recvfrom error");
return false;
}
recvfrom(server_socket, receive_buffer, buffer_size, 0, &client_addr, &client_addr_size);
memcpy((void *) ctx->remote, (void *) receive_buffer, buffer_size);
sendto(server_socket, (void *) ctx->local, buffer_size, MSG_CONFIRM, &client_addr, client_addr_size);
memcpy((void *) ctx->remote, (void *) receive_buffer, total_size);
return server_socket;
}
result = sendto(server_socket, (void *) ctx->local, total_size, MSG_CONFIRM, &client_addr, client_addr_size);
if(result == -1) {
perror("sendto error");
return false;
bool capulet_net_udp_serve_mr(int server_socket) {
struct sockaddr client_addr;
socklen_t client_addr_size = sizeof(struct sockaddr_storage);
struct CapuletNetUdpMrQueryPacket mr_query_pkt;
struct CapuletNetUdpMrInfoPacket mr_info_pkt;
struct CapuletRdmaMrMgrElement *mr_el;
size_t buffer_size;
uint8_t receive_buffer[256];
while(1) {
buffer_size = sizeof(struct CapuletNetUdpMrQueryPacket);
recvfrom(server_socket, receive_buffer, buffer_size, 0, &client_addr, &client_addr_size);
memcpy((void *) &mr_query_pkt, (void *) receive_buffer, buffer_size);
buffer_size = sizeof(struct CapuletNetUdpMrInfoPacket);
memset(&mr_info_pkt, 0, buffer_size);
mr_el = capulet_rdma_mr_mgr_find(mr_query_pkt.name);
if(mr_el) {
memcpy((void *) mr_info_pkt.name, (void *) mr_query_pkt.name, 16);
mr_info_pkt.addr = mr_el->mr->addr;
mr_info_pkt.size = mr_el->mr->length;
mr_info_pkt.key = mr_el->mr->rkey;
}
sendto(server_socket, (void *) &mr_info_pkt, buffer_size, MSG_CONFIRM, &client_addr, client_addr_size);
}
close(server_socket);
......@@ -86,69 +107,74 @@ bool capulet_net_udp_bind(struct CapuletNetUdpContext *ctx) {
return true;
}
bool capulet_net_udp_connect(struct CapuletNetUdpContext *ctx, const char *hostname) {
struct addrinfo server_hints;
struct addrinfo *server_infos;
int result;
int capulet_net_udp_send_qp(struct CapuletNetUdpContext *ctx, struct addrinfo *server_infos) {
int server_socket;
int result;
const size_t packet_size = sizeof(struct CapuletNetUdpPacket);
const size_t ctx_size = sizeof(struct CapuletNetUdpContext);
const size_t total_size = 2 * packet_size + ctx_size;
char receive_buffer[total_size];
// Set up remote address
memset(&server_hints, 0, sizeof(struct addrinfo));
server_hints.ai_family = AF_INET;
server_hints.ai_socktype = SOCK_DGRAM;
result = getaddrinfo(hostname, "7362", &server_hints, &server_infos);
if(result != 0) {
fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(result));
return false;
}
const size_t buffer_size = sizeof(struct CapuletNetUdpQpInfoPacket);
uint8_t receive_buffer[buffer_size];
// Create a socket
// XXX: assumes first address in linked list is 0.0.0.0
server_socket = socket(server_infos->ai_family, server_infos->ai_socktype, server_infos->ai_protocol);
if(server_socket == -1) {
perror("socket error");
return false;
return -1;
}
// Create and send a query with full capabilities
result = sendto(server_socket, (void *) ctx->local, total_size, 0, server_infos->ai_addr, server_infos->ai_addrlen);
result = sendto(server_socket, (void *) ctx->local, buffer_size, 0, server_infos->ai_addr, server_infos->ai_addrlen);
if(result == -1) {
perror("sendto error");
return false;
return -1;
}
// Wait for informations from the server
// XXX: blocking call in an user function
result = recvfrom(server_socket, receive_buffer, total_size, 0, server_infos->ai_addr, &server_infos->ai_addrlen);
result = recvfrom(server_socket, receive_buffer, buffer_size, 0, server_infos->ai_addr, &server_infos->ai_addrlen);
if(result == -1) {
perror("recvfrom error");
return -1;
}
memcpy((void *) ctx->remote, (void *) receive_buffer, buffer_size);
return server_socket;
}
bool capulet_net_udp_query_mr(int server_socket, struct addrinfo *server_infos, char *name, struct CapuletNetUdpMrInfoPacket *mr_info_packet) {
struct CapuletNetUdpMrQueryPacket mr_query_packet;
size_t buffer_size = sizeof(struct CapuletNetUdpMrInfoPacket);
int result;
uint8_t receive_buffer[buffer_size];
memcpy((void *) mr_query_packet.name, (void *) name, 16);
result = sendto(server_socket, (void *) &mr_query_packet, sizeof(struct CapuletNetUdpMrQueryPacket), 0, server_infos->ai_addr, server_infos->ai_addrlen);
if(result == -1) {
perror("sendto error");
return false;
}
memcpy((void *) ctx->remote, (void *) receive_buffer, total_size);
// Wait for informations from the server
result = recvfrom(server_socket, receive_buffer, buffer_size, 0, server_infos->ai_addr, &server_infos->ai_addrlen);
if(result == -1) {
perror("recvfrom error");
return false;
}
close(server_socket);
freeaddrinfo(server_infos);
memcpy((void *) mr_info_packet, (void *) receive_buffer, buffer_size);
return true;
}
void capulet_net_udp_dump_packet(struct CapuletNetUdpPacket *packet) {
void capulet_net_udp_dump_qp_packet(struct CapuletNetUdpQpInfoPacket *packet) {
printf("LID: 0x%x | QPN: 0x%x | PSN: 0x%x | GID: ", packet->lid, packet->qpn, packet->psn);
for(char i = 0; i < 8; i++) {
printf("%x%x ", packet->gid[2 * i], packet->gid[2 * i + 1]);
}
printf("\naddr: %p | size: %ld | key: 0x%x\n", packet->addr, packet->size, packet->key);
printf("\n");
}
void capulet_net_udp_free(struct CapuletNetUdpContext *ctx) {
......
......@@ -70,9 +70,6 @@ bool capulet_rdma_ib_initialize_qp(struct CapuletRdmaIbContext *ctx, int access)
return false;
}
// XXX: added to try and make things work
//ibv_query_qp(ibv_dev_qp, &ibv_dev_qp_params, IBV_QP_CAP, &ibv_dev_qp_request);
// Definition of QP state to "Initialized" and access control
ibv_dev_qp_params.qp_state = IBV_QPS_INIT;
ibv_dev_qp_params.pkey_index = 0;
......@@ -89,7 +86,7 @@ bool capulet_rdma_ib_initialize_qp(struct CapuletRdmaIbContext *ctx, int access)
return true;
}
bool capulet_rdma_ib_fill_base_udp(struct CapuletRdmaIbContext *ctx, struct ibv_mr *mr, struct CapuletNetUdpPacket *own) {
bool capulet_rdma_ib_fill_base_udp(struct CapuletRdmaIbContext *ctx, struct CapuletNetUdpQpInfoPacket *own) {
struct ibv_port_attr ibv_dev_attr;
union ibv_gid ibv_dev_gid;
......@@ -111,10 +108,6 @@ bool capulet_rdma_ib_fill_base_udp(struct CapuletRdmaIbContext *ctx, struct ibv_
own->qpn = ctx->qp->qp_num;
own->psn = lrand48() & 0xffffff;
own->addr = mr->addr;
own->size = mr->length;
own->key = mr->rkey;
return true;
}
......@@ -196,7 +189,7 @@ bool capulet_rdma_ib_post_recv(struct CapuletRdmaIbContext *ctx, struct ibv_mr *
return ibv_dev_bad_wr == NULL;
}
bool capulet_rdma_ib_send_read(struct CapuletRdmaIbContext *ctx, struct CapuletNetUdpPacket *peer, struct ibv_mr *ibv_dev_mr, int mr_size) {
bool capulet_rdma_ib_send_read(struct CapuletRdmaIbContext *ctx, struct CapuletNetUdpMrInfoPacket *remote_mr, struct ibv_mr *local_mr, int local_mr_size) {
struct ibv_send_wr ibv_dev_rdma_wr;
struct ibv_sge ibv_dev_sge;
struct ibv_send_wr *ibv_dev_bad_wr = NULL;
......@@ -206,9 +199,9 @@ bool capulet_rdma_ib_send_read(struct CapuletRdmaIbContext *ctx, struct CapuletN
memset(&ibv_dev_sge, 0, sizeof(struct ibv_sge));
memset(&ibv_dev_rdma_wr, 0, sizeof(struct ibv_send_wr));
ibv_dev_sge.addr = (uintptr_t) ibv_dev_mr->addr;
ibv_dev_sge.length = mr_size;
ibv_dev_sge.lkey = ibv_dev_mr->lkey;
ibv_dev_sge.addr = (uintptr_t) local_mr->addr;
ibv_dev_sge.length = local_mr_size;
ibv_dev_sge.lkey = local_mr->lkey;
ibv_dev_rdma_wr.wr_id = (uint64_t) rand();
ibv_dev_rdma_wr.sg_list = &ibv_dev_sge;
......@@ -216,8 +209,8 @@ bool capulet_rdma_ib_send_read(struct CapuletRdmaIbContext *ctx, struct CapuletN
ibv_dev_rdma_wr.opcode = IBV_WR_RDMA_READ;
ibv_dev_rdma_wr.send_flags = IBV_SEND_SIGNALED;
ibv_dev_rdma_wr.wr.rdma.remote_addr = (uintptr_t) peer->addr;
ibv_dev_rdma_wr.wr.rdma.rkey = peer->key;
ibv_dev_rdma_wr.wr.rdma.remote_addr = (uintptr_t) remote_mr->addr;
ibv_dev_rdma_wr.wr.rdma.rkey = remote_mr->key;
result = ibv_post_send(ctx->qp, &ibv_dev_rdma_wr, &ibv_dev_bad_wr);
......
......@@ -83,3 +83,10 @@ struct CapuletRdmaMrMgrElement *capulet_rdma_mr_mgr_find(char *name) {
return NULL;
}
void capulet_rdma_mr_mgr_free(struct CapuletRdmaMrMgrElement *el) {
free(el->mr->addr);
ibv_dereg_mr(el->mr);
free(el->name);
free(el);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment