Commit 007b350a authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'dlm-5.14' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:
 "This is a major dlm networking enhancement that adds message
  retransmission so that the dlm can reliably continue operating when
  network connections fail and nodes reconnect.

  Previously, this would result in lost messages which could only be
  handled as a node failure"

* tag 'dlm-5.14' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm: (26 commits)
  fs: dlm: invalid buffer access in lookup error
  fs: dlm: fix race in mhandle deletion
  fs: dlm: rename socket and app buffer defines
  fs: dlm: introduce proto values
  fs: dlm: move dlm allow conn
  fs: dlm: use alloc_ordered_workqueue
  fs: dlm: fix memory leak when fenced
  fs: dlm: fix lowcomms_start error case
  fs: dlm: Fix spelling mistake "stucked" -> "stuck"
  fs: dlm: Fix memory leak of object mh
  fs: dlm: don't allow half transmitted messages
  fs: dlm: add midcomms debugfs functionality
  fs: dlm: add reliable connection if reconnect
  fs: dlm: add union in dlm header for lockspace id
  fs: dlm: move out some hash functionality
  fs: dlm: add functionality to re-transmit a message
  fs: dlm: make buffer handling per msg
  fs: dlm: add more midcomms hooks
  fs: dlm: public header in out utility
  fs: dlm: fix connection tcp EOF handling
  ...
parents 8418dabd 957adb68
......@@ -20,6 +20,7 @@
#include <net/sock.h>
#include "config.h"
#include "midcomms.h"
#include "lowcomms.h"
/*
......@@ -79,6 +80,9 @@ struct dlm_cluster {
unsigned int cl_new_rsb_count;
unsigned int cl_recover_callbacks;
char cl_cluster_name[DLM_LOCKSPACE_LEN];
struct dlm_spaces *sps;
struct dlm_comms *cms;
};
static struct dlm_cluster *config_item_to_cluster(struct config_item *i)
......@@ -204,7 +208,7 @@ static int dlm_check_zero(unsigned int x)
static int dlm_check_buffer_size(unsigned int x)
{
if (x < DEFAULT_BUFFER_SIZE)
if (x < DLM_MAX_SOCKET_BUFSIZE)
return -EINVAL;
return 0;
......@@ -409,6 +413,9 @@ static struct config_group *make_cluster(struct config_group *g,
if (!cl || !sps || !cms)
goto fail;
cl->sps = sps;
cl->cms = cms;
config_group_init_type_name(&cl->group, name, &cluster_type);
config_group_init_type_name(&sps->ss_group, "spaces", &spaces_type);
config_group_init_type_name(&cms->cs_group, "comms", &comms_type);
......@@ -458,6 +465,9 @@ static void drop_cluster(struct config_group *g, struct config_item *i)
static void release_cluster(struct config_item *i)
{
struct dlm_cluster *cl = config_item_to_cluster(i);
kfree(cl->sps);
kfree(cl->cms);
kfree(cl);
}
......@@ -532,7 +542,7 @@ static void drop_comm(struct config_group *g, struct config_item *i)
struct dlm_comm *cm = config_item_to_comm(i);
if (local_comm == cm)
local_comm = NULL;
dlm_lowcomms_close(cm->nodeid);
dlm_midcomms_close(cm->nodeid);
while (cm->addr_count--)
kfree(cm->addr[cm->addr_count]);
config_item_put(i);
......@@ -942,7 +952,7 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
#define DEFAULT_SCAN_SECS 5
#define DEFAULT_LOG_DEBUG 0
#define DEFAULT_LOG_INFO 1
#define DEFAULT_PROTOCOL 0
#define DEFAULT_PROTOCOL DLM_PROTO_TCP
#define DEFAULT_MARK 0
#define DEFAULT_TIMEWARN_CS 500 /* 5 sec = 500 centiseconds */
#define DEFAULT_WAITWARN_US 0
......@@ -952,7 +962,7 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
struct dlm_config_info dlm_config = {
.ci_tcp_port = DEFAULT_TCP_PORT,
.ci_buffer_size = DEFAULT_BUFFER_SIZE,
.ci_buffer_size = DLM_MAX_SOCKET_BUFSIZE,
.ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE,
.ci_recover_timer = DEFAULT_RECOVER_TIMER,
.ci_toss_secs = DEFAULT_TOSS_SECS,
......
......@@ -12,7 +12,7 @@
#ifndef __CONFIG_DOT_H__
#define __CONFIG_DOT_H__
#define DEFAULT_BUFFER_SIZE 4096
#define DLM_MAX_SOCKET_BUFSIZE 4096
struct dlm_config_node {
int nodeid;
......@@ -23,6 +23,9 @@ struct dlm_config_node {
#define DLM_MAX_ADDR_COUNT 3
#define DLM_PROTO_TCP 0
#define DLM_PROTO_SCTP 1
struct dlm_config_info {
int ci_tcp_port;
int ci_buffer_size;
......
......@@ -16,6 +16,7 @@
#include <linux/slab.h>
#include "dlm_internal.h"
#include "midcomms.h"
#include "lock.h"
#define DLM_DEBUG_BUF_LEN 4096
......@@ -23,6 +24,7 @@ static char debug_buf[DLM_DEBUG_BUF_LEN];
static struct mutex debug_buf_lock;
static struct dentry *dlm_root;
static struct dentry *dlm_comms;
static char *print_lockmode(int mode)
{
......@@ -738,6 +740,57 @@ void dlm_delete_debug_file(struct dlm_ls *ls)
debugfs_remove(ls->ls_debug_toss_dentry);
}
static int dlm_state_show(struct seq_file *file, void *offset)
{
seq_printf(file, "%s\n", dlm_midcomms_state(file->private));
return 0;
}
DEFINE_SHOW_ATTRIBUTE(dlm_state);
static int dlm_flags_show(struct seq_file *file, void *offset)
{
seq_printf(file, "%lu\n", dlm_midcomms_flags(file->private));
return 0;
}
DEFINE_SHOW_ATTRIBUTE(dlm_flags);
static int dlm_send_queue_cnt_show(struct seq_file *file, void *offset)
{
seq_printf(file, "%d\n", dlm_midcomms_send_queue_cnt(file->private));
return 0;
}
DEFINE_SHOW_ATTRIBUTE(dlm_send_queue_cnt);
static int dlm_version_show(struct seq_file *file, void *offset)
{
seq_printf(file, "0x%08x\n", dlm_midcomms_version(file->private));
return 0;
}
DEFINE_SHOW_ATTRIBUTE(dlm_version);
void *dlm_create_debug_comms_file(int nodeid, void *data)
{
struct dentry *d_node;
char name[256];
memset(name, 0, sizeof(name));
snprintf(name, 256, "%d", nodeid);
d_node = debugfs_create_dir(name, dlm_comms);
debugfs_create_file("state", 0444, d_node, data, &dlm_state_fops);
debugfs_create_file("flags", 0444, d_node, data, &dlm_flags_fops);
debugfs_create_file("send_queue_count", 0444, d_node, data,
&dlm_send_queue_cnt_fops);
debugfs_create_file("version", 0444, d_node, data, &dlm_version_fops);
return d_node;
}
void dlm_delete_debug_comms_file(void *ctx)
{
debugfs_remove(ctx);
}
void dlm_create_debug_file(struct dlm_ls *ls)
{
char name[DLM_LOCKSPACE_LEN + 8];
......@@ -797,6 +850,7 @@ void __init dlm_register_debugfs(void)
{
mutex_init(&debug_buf_lock);
dlm_root = debugfs_create_dir("dlm", NULL);
dlm_comms = debugfs_create_dir("comms", dlm_root);
}
void dlm_unregister_debugfs(void)
......
......@@ -57,9 +57,12 @@ struct dlm_header;
struct dlm_message;
struct dlm_rcom;
struct dlm_mhandle;
struct dlm_msg;
#define log_print(fmt, args...) \
printk(KERN_ERR "dlm: "fmt"\n" , ##args)
#define log_print_ratelimited(fmt, args...) \
printk_ratelimited(KERN_ERR "dlm: "fmt"\n", ##args)
#define log_error(ls, fmt, args...) \
printk(KERN_ERR "dlm: %s: " fmt "\n", (ls)->ls_name , ##args)
......@@ -368,23 +371,33 @@ static inline int rsb_flag(struct dlm_rsb *r, enum rsb_flags flag)
/* dlm_header is first element of all structs sent between nodes */
#define DLM_HEADER_MAJOR 0x00030000
#define DLM_HEADER_MINOR 0x00000001
#define DLM_HEADER_MINOR 0x00000002
#define DLM_VERSION_3_1 0x00030001
#define DLM_VERSION_3_2 0x00030002
#define DLM_HEADER_SLOTS 0x00000001
#define DLM_MSG 1
#define DLM_RCOM 2
#define DLM_OPTS 3
#define DLM_ACK 4
#define DLM_FIN 5
struct dlm_header {
uint32_t h_version;
union {
/* for DLM_MSG and DLM_RCOM */
uint32_t h_lockspace;
/* for DLM_ACK and DLM_OPTS */
uint32_t h_seq;
} u;
uint32_t h_nodeid; /* nodeid of sender */
uint16_t h_length;
uint8_t h_cmd; /* DLM_MSG, DLM_RCOM */
uint8_t h_pad;
};
#define DLM_MSG_REQUEST 1
#define DLM_MSG_CONVERT 2
#define DLM_MSG_UNLOCK 3
......@@ -452,10 +465,29 @@ struct dlm_rcom {
char rc_buf[];
};
struct dlm_opt_header {
uint16_t t_type;
uint16_t t_length;
uint32_t o_pad;
/* need to be 8 byte aligned */
char t_value[];
};
/* encapsulation header */
struct dlm_opts {
struct dlm_header o_header;
uint8_t o_nextcmd;
uint8_t o_pad;
uint16_t o_optlen;
uint32_t o_pad2;
char o_opts[];
};
union dlm_packet {
struct dlm_header header; /* common to other two */
struct dlm_message message;
struct dlm_rcom rcom;
struct dlm_opts opts;
};
#define DLM_RSF_NEED_SLOTS 0x00000001
......@@ -722,11 +754,15 @@ void dlm_register_debugfs(void);
void dlm_unregister_debugfs(void);
void dlm_create_debug_file(struct dlm_ls *ls);
void dlm_delete_debug_file(struct dlm_ls *ls);
void *dlm_create_debug_comms_file(int nodeid, void *data);
void dlm_delete_debug_comms_file(void *ctx);
#else
static inline void dlm_register_debugfs(void) { }
static inline void dlm_unregister_debugfs(void) { }
static inline void dlm_create_debug_file(struct dlm_ls *ls) { }
static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
static inline void *dlm_create_debug_comms_file(int nodeid, void *data) { return NULL; }
static inline void dlm_delete_debug_comms_file(void *ctx) { }
#endif
#endif /* __DLM_INTERNAL_DOT_H__ */
......
......@@ -59,7 +59,7 @@
#include "dlm_internal.h"
#include <linux/dlm_device.h>
#include "memory.h"
#include "lowcomms.h"
#include "midcomms.h"
#include "requestqueue.h"
#include "util.h"
#include "dir.h"
......@@ -3534,17 +3534,17 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
char *mb;
/* get_buffer gives us a message handle (mh) that we need to
pass into lowcomms_commit and a message buffer (mb) that we
pass into midcomms_commit and a message buffer (mb) that we
write our data into */
mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
if (!mh)
return -ENOBUFS;
ms = (struct dlm_message *) mb;
ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
ms->m_header.h_lockspace = ls->ls_global_id;
ms->m_header.u.h_lockspace = ls->ls_global_id;
ms->m_header.h_nodeid = dlm_our_nodeid();
ms->m_header.h_length = mb_len;
ms->m_header.h_cmd = DLM_MSG;
......@@ -3589,7 +3589,7 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
{
dlm_message_out(ms);
dlm_lowcomms_commit_buffer(mh);
dlm_midcomms_commit_mhandle(mh);
return 0;
}
......@@ -5038,16 +5038,16 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
if (hd->h_nodeid != nodeid) {
log_print("invalid h_nodeid %d from %d lockspace %x",
hd->h_nodeid, nodeid, hd->h_lockspace);
hd->h_nodeid, nodeid, hd->u.h_lockspace);
return;
}
ls = dlm_find_lockspace_global(hd->h_lockspace);
ls = dlm_find_lockspace_global(hd->u.h_lockspace);
if (!ls) {
if (dlm_config.ci_log_debug) {
printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
"%u from %d cmd %d type %d\n",
hd->h_lockspace, nodeid, hd->h_cmd, type);
hd->u.h_lockspace, nodeid, hd->h_cmd, type);
}
if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
......
......@@ -16,6 +16,7 @@
#include "member.h"
#include "recoverd.h"
#include "dir.h"
#include "midcomms.h"
#include "lowcomms.h"
#include "config.h"
#include "memory.h"
......@@ -390,7 +391,7 @@ static int threads_start(void)
}
/* Thread for sending/receiving messages for all lockspace's */
error = dlm_lowcomms_start();
error = dlm_midcomms_start();
if (error) {
log_print("cannot start dlm lowcomms %d", error);
goto scand_fail;
......@@ -566,7 +567,12 @@ static int new_lockspace(const char *name, const char *cluster,
mutex_init(&ls->ls_requestqueue_mutex);
mutex_init(&ls->ls_clear_proc_locks);
ls->ls_recover_buf = kmalloc(LOWCOMMS_MAX_TX_BUFFER_LEN, GFP_NOFS);
/* Due backwards compatibility with 3.1 we need to use maximum
* possible dlm message size to be sure the message will fit and
* not having out of bounds issues. However on sending side 3.2
* might send less.
*/
ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
if (!ls->ls_recover_buf)
goto out_lkbidr;
......@@ -698,7 +704,7 @@ int dlm_new_lockspace(const char *name, const char *cluster,
error = 0;
if (!ls_count) {
dlm_scand_stop();
dlm_lowcomms_shutdown();
dlm_midcomms_shutdown();
dlm_lowcomms_stop();
}
out:
......@@ -787,7 +793,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
if (ls_count == 1) {
dlm_scand_stop();
dlm_lowcomms_shutdown();
dlm_midcomms_shutdown();
}
dlm_callback_stop(ls);
......
This diff is collapsed.
......@@ -12,7 +12,22 @@
#ifndef __LOWCOMMS_DOT_H__
#define __LOWCOMMS_DOT_H__
#define LOWCOMMS_MAX_TX_BUFFER_LEN 4096
#include "dlm_internal.h"
#define DLM_MIDCOMMS_OPT_LEN sizeof(struct dlm_opts)
#define DLM_MAX_APP_BUFSIZE (DLM_MAX_SOCKET_BUFSIZE - \
DLM_MIDCOMMS_OPT_LEN)
#define CONN_HASH_SIZE 32
/* This is deliberately very simple because most clusters have simple
* sequential nodeids, so we should be able to go straight to a connection
* struct in the array
*/
static inline int nodeid_hash(int nodeid)
{
return nodeid & (CONN_HASH_SIZE-1);
}
/* switch to check if dlm is running */
extern int dlm_allow_conn;
......@@ -22,8 +37,12 @@ void dlm_lowcomms_shutdown(void);
void dlm_lowcomms_stop(void);
void dlm_lowcomms_exit(void);
int dlm_lowcomms_close(int nodeid);
void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc);
void dlm_lowcomms_commit_buffer(void *mh);
struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
char **ppc, void (*cb)(struct dlm_mhandle *mh),
struct dlm_mhandle *mh);
void dlm_lowcomms_commit_msg(struct dlm_msg *msg);
void dlm_lowcomms_put_msg(struct dlm_msg *msg);
int dlm_lowcomms_resend_msg(struct dlm_msg *msg);
int dlm_lowcomms_connect_node(int nodeid);
int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark);
int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
......
......@@ -15,6 +15,7 @@
#include "recover.h"
#include "rcom.h"
#include "config.h"
#include "midcomms.h"
#include "lowcomms.h"
int dlm_slots_version(struct dlm_header *h)
......@@ -270,7 +271,7 @@ int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
log_slots(ls, gen, num, NULL, array, array_size);
max_slots = (LOWCOMMS_MAX_TX_BUFFER_LEN - sizeof(struct dlm_rcom) -
max_slots = (DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom) -
sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
if (num > max_slots) {
......@@ -329,6 +330,7 @@ static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
memb->nodeid = node->nodeid;
memb->weight = node->weight;
memb->comm_seq = node->comm_seq;
dlm_midcomms_add_member(node->nodeid);
add_ordered_member(ls, memb);
ls->ls_num_nodes++;
return 0;
......@@ -359,26 +361,34 @@ int dlm_is_removed(struct dlm_ls *ls, int nodeid)
return 0;
}
static void clear_memb_list(struct list_head *head)
static void clear_memb_list(struct list_head *head,
void (*after_del)(int nodeid))
{
struct dlm_member *memb;
while (!list_empty(head)) {
memb = list_entry(head->next, struct dlm_member, list);
list_del(&memb->list);
if (after_del)
after_del(memb->nodeid);
kfree(memb);
}
}
static void clear_members_cb(int nodeid)
{
dlm_midcomms_remove_member(nodeid);
}
void dlm_clear_members(struct dlm_ls *ls)
{
clear_memb_list(&ls->ls_nodes);
clear_memb_list(&ls->ls_nodes, clear_members_cb);
ls->ls_num_nodes = 0;
}
void dlm_clear_members_gone(struct dlm_ls *ls)
{
clear_memb_list(&ls->ls_nodes_gone);
clear_memb_list(&ls->ls_nodes_gone, NULL);
}
static void make_member_array(struct dlm_ls *ls)
......@@ -552,6 +562,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
neg++;
list_move(&memb->list, &ls->ls_nodes_gone);
dlm_midcomms_remove_member(memb->nodeid);
ls->ls_num_nodes--;
dlm_lsop_recover_slot(ls, memb);
}
......@@ -576,12 +587,18 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
*neg_out = neg;
error = ping_members(ls);
if (!error || error == -EPROTO) {
/* error -EINTR means that a new recovery action is triggered.
* We ignore this recovery action and let run the new one which might
* have new member configuration.
*/
if (error == -EINTR)
error = 0;
/* new_lockspace() may be waiting to know if the config
is good or bad */
* is good or bad
*/
ls->ls_members_result = error;
complete(&ls->ls_members_done);
}
log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
return error;
......
This diff is collapsed.
......@@ -12,7 +12,22 @@
#ifndef __MIDCOMMS_DOT_H__
#define __MIDCOMMS_DOT_H__
struct midcomms_node;
int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
gfp_t allocation, char **ppc);
void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh);
int dlm_midcomms_close(int nodeid);
int dlm_midcomms_start(void);
void dlm_midcomms_shutdown(void);
void dlm_midcomms_add_member(int nodeid);
void dlm_midcomms_remove_member(int nodeid);
void dlm_midcomms_unack_msg_resend(int nodeid);
const char *dlm_midcomms_state(struct midcomms_node *node);
unsigned long dlm_midcomms_flags(struct midcomms_node *node);
int dlm_midcomms_send_queue_cnt(struct midcomms_node *node);
uint32_t dlm_midcomms_version(struct midcomms_node *node);
#endif /* __MIDCOMMS_DOT_H__ */
......@@ -27,25 +27,15 @@ static int rcom_response(struct dlm_ls *ls)
return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
}
static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
struct dlm_rcom **rc_ret, char *mb, int mb_len)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
char *mb;
int mb_len = sizeof(struct dlm_rcom) + len;
mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
if (!mh) {
log_print("create_rcom to %d type %d len %d ENOBUFS",
to_nodeid, type, len);
return -ENOBUFS;
}
rc = (struct dlm_rcom *) mb;
rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
rc->rc_header.h_lockspace = ls->ls_global_id;
rc->rc_header.u.h_lockspace = ls->ls_global_id;
rc->rc_header.h_nodeid = dlm_our_nodeid();
rc->rc_header.h_length = mb_len;
rc->rc_header.h_cmd = DLM_RCOM;
......@@ -56,16 +46,67 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
rc->rc_seq = ls->ls_recover_seq;
spin_unlock(&ls->ls_recover_lock);
*mh_ret = mh;
*rc_ret = rc;
}
static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
{
int mb_len = sizeof(struct dlm_rcom) + len;
struct dlm_mhandle *mh;
char *mb;
mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
if (!mh) {
log_print("%s to %d type %d len %d ENOBUFS",
__func__, to_nodeid, type, len);
return -ENOBUFS;
}
_create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
*mh_ret = mh;
return 0;
}
static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
int len, struct dlm_rcom **rc_ret,
struct dlm_msg **msg_ret)
{
int mb_len = sizeof(struct dlm_rcom) + len;
struct dlm_msg *msg;
char *mb;
msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, GFP_NOFS, &mb,
NULL, NULL);
if (!msg) {
log_print("create_rcom to %d type %d len %d ENOBUFS",
to_nodeid, type, len);
return -ENOBUFS;
}
_create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
*msg_ret = msg;
return 0;
}
static void _send_rcom(struct dlm_ls *ls, struct dlm_rcom *rc)
{
dlm_rcom_out(rc);
}
static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
struct dlm_rcom *rc)
{
dlm_rcom_out(rc);
dlm_lowcomms_commit_buffer(mh);
_send_rcom(ls, rc);
dlm_midcomms_commit_mhandle(mh);
}
static void send_rcom_stateless(struct dlm_ls *ls, struct dlm_msg *msg,
struct dlm_rcom *rc)
{
_send_rcom(ls, rc);
dlm_lowcomms_commit_msg(msg);
dlm_lowcomms_put_msg(msg);
}
static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
......@@ -141,7 +182,7 @@ static void disallow_sync_reply(struct dlm_ls *ls)
int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
struct dlm_msg *msg;
int error = 0;
ls->ls_recover_nodeid = nodeid;
......@@ -153,17 +194,17 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
}
retry:
error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
sizeof(struct rcom_status), &rc, &mh);
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS,
sizeof(struct rcom_status), &rc, &msg);
if (error)
goto out;
set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
allow_sync_reply(ls, &rc->rc_id);
memset(ls->ls_recover_buf, 0, LOWCOMMS_MAX_TX_BUFFER_LEN);
memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
send_rcom(ls, mh, rc);
send_rcom_stateless(ls, msg, rc);
error = dlm_wait_function(ls, &rcom_response);
disallow_sync_reply(ls);
......@@ -191,11 +232,11 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
struct rcom_status *rs;
uint32_t status;
int nodeid = rc_in->rc_header.h_nodeid;
int len = sizeof(struct rcom_config);
struct dlm_msg *msg;
int num_slots = 0;
int error;
......@@ -218,8 +259,8 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
len += num_slots * sizeof(struct rcom_slot);
do_create:
error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
len, &rc, &mh);
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY,
len, &rc, &msg);
if (error)
return;
......@@ -246,7 +287,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
spin_unlock(&ls->ls_recover_lock);
do_send:
send_rcom(ls, mh, rc);
send_rcom_stateless(ls, msg, rc);
}
static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
......@@ -271,21 +312,22 @@ static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
struct dlm_msg *msg;
int error = 0;
ls->ls_recover_nodeid = nodeid;
retry:
error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len,
&rc, &msg);
if (error)
goto out;
memcpy(rc->rc_buf, last_name, last_len);
allow_sync_reply(ls, &rc->rc_id);
memset(ls->ls_recover_buf, 0, LOWCOMMS_MAX_TX_BUFFER_LEN);
memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
send_rcom(ls, mh, rc);
send_rcom_stateless(ls, msg, rc);
error = dlm_wait_function(ls, &rcom_response);
disallow_sync_reply(ls);
......@@ -298,14 +340,15 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
int error, inlen, outlen, nodeid;
struct dlm_msg *msg;
nodeid = rc_in->rc_header.h_nodeid;
inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
outlen = LOWCOMMS_MAX_TX_BUFFER_LEN - sizeof(struct dlm_rcom);
outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom);
error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
&rc, &msg);
if (error)
return;
rc->rc_id = rc_in->rc_id;
......@@ -313,7 +356,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
nodeid);
send_rcom(ls, mh, rc);
send_rcom_stateless(ls, msg, rc);
}
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
......@@ -342,10 +385,6 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
if (error)
return;
/* Old code would send this special id to trigger a debug dump. */
if (rc_in->rc_id == 0xFFFFFFFF) {
log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
......@@ -353,6 +392,10 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
return;
}
error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
if (error)
return;
error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
if (error)
......@@ -458,14 +501,14 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
char *mb;
int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
mh = dlm_midcomms_get_mhandle(nodeid, mb_len, GFP_NOFS, &mb);
if (!mh)
return -ENOBUFS;
rc = (struct dlm_rcom *) mb;
rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
rc->rc_header.u.h_lockspace = rc_in->rc_header.u.h_lockspace;
rc->rc_header.h_nodeid = dlm_our_nodeid();
rc->rc_header.h_length = mb_len;
rc->rc_header.h_cmd = DLM_RCOM;
......@@ -479,7 +522,7 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
rf->rf_lvblen = cpu_to_le32(~0U);
dlm_rcom_out(rc);
dlm_lowcomms_commit_buffer(mh);
dlm_midcomms_commit_mhandle(mh);
return 0;
}
......
......@@ -20,18 +20,20 @@
#define DLM_ERRNO_ETIMEDOUT 110
#define DLM_ERRNO_EINPROGRESS 115
static void header_out(struct dlm_header *hd)
void header_out(struct dlm_header *hd)
{
hd->h_version = cpu_to_le32(hd->h_version);
hd->h_lockspace = cpu_to_le32(hd->h_lockspace);
/* does it for others u32 in union as well */
hd->u.h_lockspace = cpu_to_le32(hd->u.h_lockspace);
hd->h_nodeid = cpu_to_le32(hd->h_nodeid);
hd->h_length = cpu_to_le16(hd->h_length);
}
static void header_in(struct dlm_header *hd)
void header_in(struct dlm_header *hd)
{
hd->h_version = le32_to_cpu(hd->h_version);
hd->h_lockspace = le32_to_cpu(hd->h_lockspace);
/* does it for others u32 in union as well */
hd->u.h_lockspace = le32_to_cpu(hd->u.h_lockspace);
hd->h_nodeid = le32_to_cpu(hd->h_nodeid);
hd->h_length = le16_to_cpu(hd->h_length);
}
......
......@@ -15,6 +15,8 @@ void dlm_message_out(struct dlm_message *ms);
void dlm_message_in(struct dlm_message *ms);
void dlm_rcom_out(struct dlm_rcom *rc);
void dlm_rcom_in(struct dlm_rcom *rc);
void header_out(struct dlm_header *hd);
void header_in(struct dlm_header *hd);
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment