Commit c024a811 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'dlm-5.10' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:
 "This set continues the ongoing rework of the low level communication
  layer in the dlm.

  The focus here is on improvements to connection handling, and
  reworking the receiving of messages"

* tag 'dlm-5.10' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  fs: dlm: fix race in nodeid2con
  fs: dlm: rework receive handling
  fs: dlm: disallow buffer size below default
  fs: dlm: handle range check as callback
  fs: dlm: fix mark per nodeid setting
  fs: dlm: remove lock dependency warning
  fs: dlm: use free_con to free connection
  fs: dlm: handle possible othercon writequeues
  fs: dlm: move free writequeue into con free
  fs: dlm: fix configfs memory leak
  fs: dlm: fix dlm_local_addr memory leak
  fs: dlm: make connection hash lockless
  fs: dlm: synchronize dlm before shutdown
parents 6f5032a8 4f2b30fd
...@@ -4,6 +4,7 @@ menuconfig DLM ...@@ -4,6 +4,7 @@ menuconfig DLM
depends on INET depends on INET
depends on SYSFS && CONFIGFS_FS && (IPV6 || IPV6=n) depends on SYSFS && CONFIGFS_FS && (IPV6 || IPV6=n)
select IP_SCTP select IP_SCTP
select SRCU
help help
A general purpose distributed lock manager for kernel or userspace A general purpose distributed lock manager for kernel or userspace
applications. applications.
......
...@@ -125,7 +125,7 @@ static ssize_t cluster_cluster_name_store(struct config_item *item, ...@@ -125,7 +125,7 @@ static ssize_t cluster_cluster_name_store(struct config_item *item,
CONFIGFS_ATTR(cluster_, cluster_name); CONFIGFS_ATTR(cluster_, cluster_name);
static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field,
int *info_field, int check_zero, int *info_field, bool (*check_cb)(unsigned int x),
const char *buf, size_t len) const char *buf, size_t len)
{ {
unsigned int x; unsigned int x;
...@@ -137,7 +137,7 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, ...@@ -137,7 +137,7 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field,
if (rc) if (rc)
return rc; return rc;
if (check_zero && !x) if (check_cb && check_cb(x))
return -EINVAL; return -EINVAL;
*cl_field = x; *cl_field = x;
...@@ -146,13 +146,13 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, ...@@ -146,13 +146,13 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field,
return len; return len;
} }
#define CLUSTER_ATTR(name, check_zero) \ #define CLUSTER_ATTR(name, check_cb) \
static ssize_t cluster_##name##_store(struct config_item *item, \ static ssize_t cluster_##name##_store(struct config_item *item, \
const char *buf, size_t len) \ const char *buf, size_t len) \
{ \ { \
struct dlm_cluster *cl = config_item_to_cluster(item); \ struct dlm_cluster *cl = config_item_to_cluster(item); \
return cluster_set(cl, &cl->cl_##name, &dlm_config.ci_##name, \ return cluster_set(cl, &cl->cl_##name, &dlm_config.ci_##name, \
check_zero, buf, len); \ check_cb, buf, len); \
} \ } \
static ssize_t cluster_##name##_show(struct config_item *item, char *buf) \ static ssize_t cluster_##name##_show(struct config_item *item, char *buf) \
{ \ { \
...@@ -161,20 +161,30 @@ static ssize_t cluster_##name##_show(struct config_item *item, char *buf) \ ...@@ -161,20 +161,30 @@ static ssize_t cluster_##name##_show(struct config_item *item, char *buf) \
} \ } \
CONFIGFS_ATTR(cluster_, name); CONFIGFS_ATTR(cluster_, name);
CLUSTER_ATTR(tcp_port, 1); static bool dlm_check_zero(unsigned int x)
CLUSTER_ATTR(buffer_size, 1); {
CLUSTER_ATTR(rsbtbl_size, 1); return !x;
CLUSTER_ATTR(recover_timer, 1); }
CLUSTER_ATTR(toss_secs, 1);
CLUSTER_ATTR(scan_secs, 1); static bool dlm_check_buffer_size(unsigned int x)
CLUSTER_ATTR(log_debug, 0); {
CLUSTER_ATTR(log_info, 0); return (x < DEFAULT_BUFFER_SIZE);
CLUSTER_ATTR(protocol, 0); }
CLUSTER_ATTR(mark, 0);
CLUSTER_ATTR(timewarn_cs, 1); CLUSTER_ATTR(tcp_port, dlm_check_zero);
CLUSTER_ATTR(waitwarn_us, 0); CLUSTER_ATTR(buffer_size, dlm_check_buffer_size);
CLUSTER_ATTR(new_rsb_count, 0); CLUSTER_ATTR(rsbtbl_size, dlm_check_zero);
CLUSTER_ATTR(recover_callbacks, 0); CLUSTER_ATTR(recover_timer, dlm_check_zero);
CLUSTER_ATTR(toss_secs, dlm_check_zero);
CLUSTER_ATTR(scan_secs, dlm_check_zero);
CLUSTER_ATTR(log_debug, NULL);
CLUSTER_ATTR(log_info, NULL);
CLUSTER_ATTR(protocol, NULL);
CLUSTER_ATTR(mark, NULL);
CLUSTER_ATTR(timewarn_cs, dlm_check_zero);
CLUSTER_ATTR(waitwarn_us, NULL);
CLUSTER_ATTR(new_rsb_count, NULL);
CLUSTER_ATTR(recover_callbacks, NULL);
static struct configfs_attribute *cluster_attrs[] = { static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port, [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port,
...@@ -221,6 +231,7 @@ struct dlm_space { ...@@ -221,6 +231,7 @@ struct dlm_space {
struct list_head members; struct list_head members;
struct mutex members_lock; struct mutex members_lock;
int members_count; int members_count;
struct dlm_nodes *nds;
}; };
struct dlm_comms { struct dlm_comms {
...@@ -430,6 +441,7 @@ static struct config_group *make_space(struct config_group *g, const char *name) ...@@ -430,6 +441,7 @@ static struct config_group *make_space(struct config_group *g, const char *name)
INIT_LIST_HEAD(&sp->members); INIT_LIST_HEAD(&sp->members);
mutex_init(&sp->members_lock); mutex_init(&sp->members_lock);
sp->members_count = 0; sp->members_count = 0;
sp->nds = nds;
return &sp->group; return &sp->group;
fail: fail:
...@@ -451,6 +463,7 @@ static void drop_space(struct config_group *g, struct config_item *i) ...@@ -451,6 +463,7 @@ static void drop_space(struct config_group *g, struct config_item *i)
static void release_space(struct config_item *i) static void release_space(struct config_item *i)
{ {
struct dlm_space *sp = config_item_to_space(i); struct dlm_space *sp = config_item_to_space(i);
kfree(sp->nds);
kfree(sp); kfree(sp);
} }
...@@ -857,18 +870,22 @@ int dlm_comm_seq(int nodeid, uint32_t *seq) ...@@ -857,18 +870,22 @@ int dlm_comm_seq(int nodeid, uint32_t *seq)
return 0; return 0;
} }
int dlm_comm_mark(int nodeid, unsigned int *mark) void dlm_comm_mark(int nodeid, unsigned int *mark)
{ {
struct dlm_comm *cm; struct dlm_comm *cm;
cm = get_comm(nodeid); cm = get_comm(nodeid);
if (!cm) if (!cm) {
return -ENOENT; *mark = dlm_config.ci_mark;
return;
}
*mark = cm->mark; if (cm->mark)
put_comm(cm); *mark = cm->mark;
else
*mark = dlm_config.ci_mark;
return 0; put_comm(cm);
} }
int dlm_our_nodeid(void) int dlm_our_nodeid(void)
...@@ -889,7 +906,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num) ...@@ -889,7 +906,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
/* Config file defaults */ /* Config file defaults */
#define DEFAULT_TCP_PORT 21064 #define DEFAULT_TCP_PORT 21064
#define DEFAULT_BUFFER_SIZE 4096
#define DEFAULT_RSBTBL_SIZE 1024 #define DEFAULT_RSBTBL_SIZE 1024
#define DEFAULT_RECOVER_TIMER 5 #define DEFAULT_RECOVER_TIMER 5
#define DEFAULT_TOSS_SECS 10 #define DEFAULT_TOSS_SECS 10
......
...@@ -12,6 +12,8 @@ ...@@ -12,6 +12,8 @@
#ifndef __CONFIG_DOT_H__ #ifndef __CONFIG_DOT_H__
#define __CONFIG_DOT_H__ #define __CONFIG_DOT_H__
#define DEFAULT_BUFFER_SIZE 4096
struct dlm_config_node { struct dlm_config_node {
int nodeid; int nodeid;
int weight; int weight;
...@@ -46,7 +48,7 @@ void dlm_config_exit(void); ...@@ -46,7 +48,7 @@ void dlm_config_exit(void);
int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out, int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
int *count_out); int *count_out);
int dlm_comm_seq(int nodeid, uint32_t *seq); int dlm_comm_seq(int nodeid, uint32_t *seq);
int dlm_comm_mark(int nodeid, unsigned int *mark); void dlm_comm_mark(int nodeid, unsigned int *mark);
int dlm_our_nodeid(void); int dlm_our_nodeid(void);
int dlm_our_addr(struct sockaddr_storage *addr, int num); int dlm_our_addr(struct sockaddr_storage *addr, int num);
......
...@@ -65,40 +65,6 @@ ...@@ -65,40 +65,6 @@
#define MAX_SEND_MSG_COUNT 25 #define MAX_SEND_MSG_COUNT 25
#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000) #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
struct cbuf {
unsigned int base;
unsigned int len;
unsigned int mask;
};
static void cbuf_add(struct cbuf *cb, int n)
{
cb->len += n;
}
static int cbuf_data(struct cbuf *cb)
{
return ((cb->base + cb->len) & cb->mask);
}
static void cbuf_init(struct cbuf *cb, int size)
{
cb->base = cb->len = 0;
cb->mask = size-1;
}
static void cbuf_eat(struct cbuf *cb, int n)
{
cb->len -= n;
cb->base += n;
cb->base &= cb->mask;
}
static bool cbuf_empty(struct cbuf *cb)
{
return cb->len == 0;
}
struct connection { struct connection {
struct socket *sock; /* NULL if not connected */ struct socket *sock; /* NULL if not connected */
uint32_t nodeid; /* So we know who we are in the list */ uint32_t nodeid; /* So we know who we are in the list */
...@@ -117,8 +83,6 @@ struct connection { ...@@ -117,8 +83,6 @@ struct connection {
int (*rx_action) (struct connection *); /* What to do when active */ int (*rx_action) (struct connection *); /* What to do when active */
void (*connect_action) (struct connection *); /* What to do to connect */ void (*connect_action) (struct connection *); /* What to do to connect */
void (*shutdown_action)(struct connection *con); /* What to do to shutdown */ void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
struct page *rx_page;
struct cbuf cb;
int retries; int retries;
#define MAX_CONNECT_RETRIES 3 #define MAX_CONNECT_RETRIES 3
struct hlist_node list; struct hlist_node list;
...@@ -126,6 +90,10 @@ struct connection { ...@@ -126,6 +90,10 @@ struct connection {
struct work_struct rwork; /* Receive workqueue */ struct work_struct rwork; /* Receive workqueue */
struct work_struct swork; /* Send workqueue */ struct work_struct swork; /* Send workqueue */
wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
unsigned char *rx_buf;
int rx_buflen;
int rx_leftover;
struct rcu_head rcu;
}; };
#define sock2con(x) ((struct connection *)(x)->sk_user_data) #define sock2con(x) ((struct connection *)(x)->sk_user_data)
...@@ -167,8 +135,8 @@ static struct workqueue_struct *recv_workqueue; ...@@ -167,8 +135,8 @@ static struct workqueue_struct *recv_workqueue;
static struct workqueue_struct *send_workqueue; static struct workqueue_struct *send_workqueue;
static struct hlist_head connection_hash[CONN_HASH_SIZE]; static struct hlist_head connection_hash[CONN_HASH_SIZE];
static DEFINE_MUTEX(connections_lock); static DEFINE_SPINLOCK(connections_lock);
static struct kmem_cache *con_cache; DEFINE_STATIC_SRCU(connections_srcu);
static void process_recv_sockets(struct work_struct *work); static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work); static void process_send_sockets(struct work_struct *work);
...@@ -184,15 +152,20 @@ static inline int nodeid_hash(int nodeid) ...@@ -184,15 +152,20 @@ static inline int nodeid_hash(int nodeid)
static struct connection *__find_con(int nodeid) static struct connection *__find_con(int nodeid)
{ {
int r; int r, idx;
struct connection *con; struct connection *con;
r = nodeid_hash(nodeid); r = nodeid_hash(nodeid);
hlist_for_each_entry(con, &connection_hash[r], list) { idx = srcu_read_lock(&connections_srcu);
if (con->nodeid == nodeid) hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
if (con->nodeid == nodeid) {
srcu_read_unlock(&connections_srcu, idx);
return con; return con;
}
} }
srcu_read_unlock(&connections_srcu, idx);
return NULL; return NULL;
} }
...@@ -200,21 +173,25 @@ static struct connection *__find_con(int nodeid) ...@@ -200,21 +173,25 @@ static struct connection *__find_con(int nodeid)
* If 'allocation' is zero then we don't attempt to create a new * If 'allocation' is zero then we don't attempt to create a new
* connection structure for this node. * connection structure for this node.
*/ */
static struct connection *__nodeid2con(int nodeid, gfp_t alloc) static struct connection *nodeid2con(int nodeid, gfp_t alloc)
{ {
struct connection *con = NULL; struct connection *con, *tmp;
int r; int r;
con = __find_con(nodeid); con = __find_con(nodeid);
if (con || !alloc) if (con || !alloc)
return con; return con;
con = kmem_cache_zalloc(con_cache, alloc); con = kzalloc(sizeof(*con), alloc);
if (!con) if (!con)
return NULL; return NULL;
r = nodeid_hash(nodeid); con->rx_buflen = dlm_config.ci_buffer_size;
hlist_add_head(&con->list, &connection_hash[r]); con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
if (!con->rx_buf) {
kfree(con);
return NULL;
}
con->nodeid = nodeid; con->nodeid = nodeid;
mutex_init(&con->sock_mutex); mutex_init(&con->sock_mutex);
...@@ -233,31 +210,41 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc) ...@@ -233,31 +210,41 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
con->rx_action = zerocon->rx_action; con->rx_action = zerocon->rx_action;
} }
r = nodeid_hash(nodeid);
spin_lock(&connections_lock);
/* Because multiple workqueues/threads calls this function it can
* race on multiple cpu's. Instead of locking hot path __find_con()
* we just check in rare cases of recently added nodes again
* under protection of connections_lock. If this is the case we
* abort our connection creation and return the existing connection.
*/
tmp = __find_con(nodeid);
if (tmp) {
spin_unlock(&connections_lock);
kfree(con->rx_buf);
kfree(con);
return tmp;
}
hlist_add_head_rcu(&con->list, &connection_hash[r]);
spin_unlock(&connections_lock);
return con; return con;
} }
/* Loop round all connections */ /* Loop round all connections */
static void foreach_conn(void (*conn_func)(struct connection *c)) static void foreach_conn(void (*conn_func)(struct connection *c))
{ {
int i; int i, idx;
struct hlist_node *n;
struct connection *con; struct connection *con;
idx = srcu_read_lock(&connections_srcu);
for (i = 0; i < CONN_HASH_SIZE; i++) { for (i = 0; i < CONN_HASH_SIZE; i++) {
hlist_for_each_entry_safe(con, n, &connection_hash[i], list) hlist_for_each_entry_rcu(con, &connection_hash[i], list)
conn_func(con); conn_func(con);
} }
} srcu_read_unlock(&connections_srcu, idx);
static struct connection *nodeid2con(int nodeid, gfp_t allocation)
{
struct connection *con;
mutex_lock(&connections_lock);
con = __nodeid2con(nodeid, allocation);
mutex_unlock(&connections_lock);
return con;
} }
static struct dlm_node_addr *find_node_addr(int nodeid) static struct dlm_node_addr *find_node_addr(int nodeid)
...@@ -614,11 +601,8 @@ static void close_connection(struct connection *con, bool and_other, ...@@ -614,11 +601,8 @@ static void close_connection(struct connection *con, bool and_other,
/* Will only re-enter once. */ /* Will only re-enter once. */
close_connection(con->othercon, false, true, true); close_connection(con->othercon, false, true, true);
} }
if (con->rx_page) {
__free_page(con->rx_page);
con->rx_page = NULL;
}
con->rx_leftover = 0;
con->retries = 0; con->retries = 0;
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
clear_bit(CF_CLOSING, &con->flags); clear_bit(CF_CLOSING, &con->flags);
...@@ -672,16 +656,33 @@ static void dlm_tcp_shutdown(struct connection *con) ...@@ -672,16 +656,33 @@ static void dlm_tcp_shutdown(struct connection *con)
shutdown_connection(con); shutdown_connection(con);
} }
static int con_realloc_receive_buf(struct connection *con, int newlen)
{
unsigned char *newbuf;
newbuf = kmalloc(newlen, GFP_NOFS);
if (!newbuf)
return -ENOMEM;
/* copy any leftover from last receive */
if (con->rx_leftover)
memmove(newbuf, con->rx_buf, con->rx_leftover);
/* swap to new buffer space */
kfree(con->rx_buf);
con->rx_buflen = newlen;
con->rx_buf = newbuf;
return 0;
}
/* Data received from remote end */ /* Data received from remote end */
static int receive_from_sock(struct connection *con) static int receive_from_sock(struct connection *con)
{ {
int ret = 0;
struct msghdr msg = {};
struct kvec iov[2];
unsigned len;
int r;
int call_again_soon = 0; int call_again_soon = 0;
int nvec; struct msghdr msg;
struct kvec iov;
int ret, buflen;
mutex_lock(&con->sock_mutex); mutex_lock(&con->sock_mutex);
...@@ -689,71 +690,55 @@ static int receive_from_sock(struct connection *con) ...@@ -689,71 +690,55 @@ static int receive_from_sock(struct connection *con)
ret = -EAGAIN; ret = -EAGAIN;
goto out_close; goto out_close;
} }
if (con->nodeid == 0) { if (con->nodeid == 0) {
ret = -EINVAL; ret = -EINVAL;
goto out_close; goto out_close;
} }
if (con->rx_page == NULL) { /* realloc if we get new buffer size to read out */
/* buflen = dlm_config.ci_buffer_size;
* This doesn't need to be atomic, but I think it should if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
* improve performance if it is. ret = con_realloc_receive_buf(con, buflen);
*/ if (ret < 0)
con->rx_page = alloc_page(GFP_ATOMIC);
if (con->rx_page == NULL)
goto out_resched; goto out_resched;
cbuf_init(&con->cb, PAGE_SIZE);
} }
/* /* calculate new buffer parameter regarding last receive and
* iov[0] is the bit of the circular buffer between the current end * possible leftover bytes
* point (cb.base + cb.len) and the end of the buffer.
*/
iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
iov[1].iov_len = 0;
nvec = 1;
/*
* iov[1] is the bit of the circular buffer between the start of the
* buffer and the start of the currently used section (cb.base)
*/ */
if (cbuf_data(&con->cb) >= con->cb.base) { iov.iov_base = con->rx_buf + con->rx_leftover;
iov[0].iov_len = PAGE_SIZE - cbuf_data(&con->cb); iov.iov_len = con->rx_buflen - con->rx_leftover;
iov[1].iov_len = con->cb.base;
iov[1].iov_base = page_address(con->rx_page);
nvec = 2;
}
len = iov[0].iov_len + iov[1].iov_len;
iov_iter_kvec(&msg.msg_iter, READ, iov, nvec, len);
r = ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT | MSG_NOSIGNAL); memset(&msg, 0, sizeof(msg));
msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
msg.msg_flags);
if (ret <= 0) if (ret <= 0)
goto out_close; goto out_close;
else if (ret == len) else if (ret == iov.iov_len)
call_again_soon = 1; call_again_soon = 1;
cbuf_add(&con->cb, ret); /* new buflen according readed bytes and leftover from last receive */
ret = dlm_process_incoming_buffer(con->nodeid, buflen = ret + con->rx_leftover;
page_address(con->rx_page), ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
con->cb.base, con->cb.len, if (ret < 0)
PAGE_SIZE); goto out_close;
if (ret < 0) {
log_print("lowcomms err %d: addr=%p, base=%u, len=%u, read=%d",
ret, page_address(con->rx_page), con->cb.base,
con->cb.len, r);
cbuf_eat(&con->cb, r);
} else {
cbuf_eat(&con->cb, ret);
}
if (cbuf_empty(&con->cb) && !call_again_soon) { /* calculate leftover bytes from process and put it into begin of
__free_page(con->rx_page); * the receive buffer, so next receive we have the full message
con->rx_page = NULL; * at the start address of the receive buffer.
*/
con->rx_leftover = buflen - ret;
if (con->rx_leftover) {
memmove(con->rx_buf, con->rx_buf + ret,
con->rx_leftover);
call_again_soon = true;
} }
if (call_again_soon) if (call_again_soon)
goto out_resched; goto out_resched;
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
return 0; return 0;
...@@ -791,13 +776,11 @@ static int accept_from_sock(struct connection *con) ...@@ -791,13 +776,11 @@ static int accept_from_sock(struct connection *con)
int nodeid; int nodeid;
struct connection *newcon; struct connection *newcon;
struct connection *addcon; struct connection *addcon;
unsigned int mark;
mutex_lock(&connections_lock);
if (!dlm_allow_conn) { if (!dlm_allow_conn) {
mutex_unlock(&connections_lock);
return -1; return -1;
} }
mutex_unlock(&connections_lock);
mutex_lock_nested(&con->sock_mutex, 0); mutex_lock_nested(&con->sock_mutex, 0);
...@@ -830,6 +813,9 @@ static int accept_from_sock(struct connection *con) ...@@ -830,6 +813,9 @@ static int accept_from_sock(struct connection *con)
return -1; return -1;
} }
dlm_comm_mark(nodeid, &mark);
sock_set_mark(newsock->sk, mark);
log_print("got connection from %d", nodeid); log_print("got connection from %d", nodeid);
/* Check to see if we already have a connection to this node. This /* Check to see if we already have a connection to this node. This
...@@ -847,13 +833,24 @@ static int accept_from_sock(struct connection *con) ...@@ -847,13 +833,24 @@ static int accept_from_sock(struct connection *con)
struct connection *othercon = newcon->othercon; struct connection *othercon = newcon->othercon;
if (!othercon) { if (!othercon) {
othercon = kmem_cache_zalloc(con_cache, GFP_NOFS); othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
if (!othercon) { if (!othercon) {
log_print("failed to allocate incoming socket"); log_print("failed to allocate incoming socket");
mutex_unlock(&newcon->sock_mutex); mutex_unlock(&newcon->sock_mutex);
result = -ENOMEM; result = -ENOMEM;
goto accept_err; goto accept_err;
} }
othercon->rx_buflen = dlm_config.ci_buffer_size;
othercon->rx_buf = kmalloc(othercon->rx_buflen, GFP_NOFS);
if (!othercon->rx_buf) {
mutex_unlock(&newcon->sock_mutex);
kfree(othercon);
log_print("failed to allocate incoming socket receive buffer");
result = -ENOMEM;
goto accept_err;
}
othercon->nodeid = nodeid; othercon->nodeid = nodeid;
othercon->rx_action = receive_from_sock; othercon->rx_action = receive_from_sock;
mutex_init(&othercon->sock_mutex); mutex_init(&othercon->sock_mutex);
...@@ -975,6 +972,8 @@ static void sctp_connect_to_sock(struct connection *con) ...@@ -975,6 +972,8 @@ static void sctp_connect_to_sock(struct connection *con)
return; return;
} }
dlm_comm_mark(con->nodeid, &mark);
mutex_lock(&con->sock_mutex); mutex_lock(&con->sock_mutex);
/* Some odd races can cause double-connects, ignore them */ /* Some odd races can cause double-connects, ignore them */
...@@ -999,11 +998,6 @@ static void sctp_connect_to_sock(struct connection *con) ...@@ -999,11 +998,6 @@ static void sctp_connect_to_sock(struct connection *con)
if (result < 0) if (result < 0)
goto socket_err; goto socket_err;
/* set skb mark */
result = dlm_comm_mark(con->nodeid, &mark);
if (result < 0)
goto bind_err;
sock_set_mark(sock->sk, mark); sock_set_mark(sock->sk, mark);
con->rx_action = receive_from_sock; con->rx_action = receive_from_sock;
...@@ -1076,6 +1070,8 @@ static void tcp_connect_to_sock(struct connection *con) ...@@ -1076,6 +1070,8 @@ static void tcp_connect_to_sock(struct connection *con)
return; return;
} }
dlm_comm_mark(con->nodeid, &mark);
mutex_lock(&con->sock_mutex); mutex_lock(&con->sock_mutex);
if (con->retries++ > MAX_CONNECT_RETRIES) if (con->retries++ > MAX_CONNECT_RETRIES)
goto out; goto out;
...@@ -1090,11 +1086,6 @@ static void tcp_connect_to_sock(struct connection *con) ...@@ -1090,11 +1086,6 @@ static void tcp_connect_to_sock(struct connection *con)
if (result < 0) if (result < 0)
goto out_err; goto out_err;
/* set skb mark */
result = dlm_comm_mark(con->nodeid, &mark);
if (result < 0)
goto out_err;
sock_set_mark(sock->sk, mark); sock_set_mark(sock->sk, mark);
memset(&saddr, 0, sizeof(saddr)); memset(&saddr, 0, sizeof(saddr));
...@@ -1238,6 +1229,14 @@ static void init_local(void) ...@@ -1238,6 +1229,14 @@ static void init_local(void)
} }
} }
static void deinit_local(void)
{
int i;
for (i = 0; i < dlm_local_count; i++)
kfree(dlm_local_addr[i]);
}
/* Initialise SCTP socket and bind to all interfaces */ /* Initialise SCTP socket and bind to all interfaces */
static int sctp_listen_for_all(void) static int sctp_listen_for_all(void)
{ {
...@@ -1546,13 +1545,6 @@ static void process_send_sockets(struct work_struct *work) ...@@ -1546,13 +1545,6 @@ static void process_send_sockets(struct work_struct *work)
send_to_sock(con); send_to_sock(con);
} }
/* Discard all entries on the write queues */
static void clean_writequeues(void)
{
foreach_conn(clean_one_writequeue);
}
static void work_stop(void) static void work_stop(void)
{ {
if (recv_workqueue) if (recv_workqueue)
...@@ -1608,26 +1600,34 @@ static void shutdown_conn(struct connection *con) ...@@ -1608,26 +1600,34 @@ static void shutdown_conn(struct connection *con)
con->shutdown_action(con); con->shutdown_action(con);
} }
static void connection_release(struct rcu_head *rcu)
{
struct connection *con = container_of(rcu, struct connection, rcu);
kfree(con->rx_buf);
kfree(con);
}
static void free_conn(struct connection *con) static void free_conn(struct connection *con)
{ {
close_connection(con, true, true, true); close_connection(con, true, true, true);
if (con->othercon) spin_lock(&connections_lock);
kmem_cache_free(con_cache, con->othercon); hlist_del_rcu(&con->list);
hlist_del(&con->list); spin_unlock(&connections_lock);
kmem_cache_free(con_cache, con); if (con->othercon) {
clean_one_writequeue(con->othercon);
call_rcu(&con->othercon->rcu, connection_release);
}
clean_one_writequeue(con);
call_rcu(&con->rcu, connection_release);
} }
static void work_flush(void) static void work_flush(void)
{ {
int ok; int ok, idx;
int i; int i;
struct hlist_node *n;
struct connection *con; struct connection *con;
if (recv_workqueue)
flush_workqueue(recv_workqueue);
if (send_workqueue)
flush_workqueue(send_workqueue);
do { do {
ok = 1; ok = 1;
foreach_conn(stop_conn); foreach_conn(stop_conn);
...@@ -1635,9 +1635,10 @@ static void work_flush(void) ...@@ -1635,9 +1635,10 @@ static void work_flush(void)
flush_workqueue(recv_workqueue); flush_workqueue(recv_workqueue);
if (send_workqueue) if (send_workqueue)
flush_workqueue(send_workqueue); flush_workqueue(send_workqueue);
idx = srcu_read_lock(&connections_srcu);
for (i = 0; i < CONN_HASH_SIZE && ok; i++) { for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
hlist_for_each_entry_safe(con, n, hlist_for_each_entry_rcu(con, &connection_hash[i],
&connection_hash[i], list) { list) {
ok &= test_bit(CF_READ_PENDING, &con->flags); ok &= test_bit(CF_READ_PENDING, &con->flags);
ok &= test_bit(CF_WRITE_PENDING, &con->flags); ok &= test_bit(CF_WRITE_PENDING, &con->flags);
if (con->othercon) { if (con->othercon) {
...@@ -1648,6 +1649,7 @@ static void work_flush(void) ...@@ -1648,6 +1649,7 @@ static void work_flush(void)
} }
} }
} }
srcu_read_unlock(&connections_srcu, idx);
} while (!ok); } while (!ok);
} }
...@@ -1656,16 +1658,18 @@ void dlm_lowcomms_stop(void) ...@@ -1656,16 +1658,18 @@ void dlm_lowcomms_stop(void)
/* Set all the flags to prevent any /* Set all the flags to prevent any
socket activity. socket activity.
*/ */
mutex_lock(&connections_lock);
dlm_allow_conn = 0; dlm_allow_conn = 0;
mutex_unlock(&connections_lock);
if (recv_workqueue)
flush_workqueue(recv_workqueue);
if (send_workqueue)
flush_workqueue(send_workqueue);
foreach_conn(shutdown_conn); foreach_conn(shutdown_conn);
work_flush(); work_flush();
clean_writequeues();
foreach_conn(free_conn); foreach_conn(free_conn);
work_stop(); work_stop();
deinit_local();
kmem_cache_destroy(con_cache);
} }
int dlm_lowcomms_start(void) int dlm_lowcomms_start(void)
...@@ -1684,16 +1688,9 @@ int dlm_lowcomms_start(void) ...@@ -1684,16 +1688,9 @@ int dlm_lowcomms_start(void)
goto fail; goto fail;
} }
error = -ENOMEM;
con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
__alignof__(struct connection), 0,
NULL);
if (!con_cache)
goto fail;
error = work_start(); error = work_start();
if (error) if (error)
goto fail_destroy; goto fail;
dlm_allow_conn = 1; dlm_allow_conn = 1;
...@@ -1710,12 +1707,8 @@ int dlm_lowcomms_start(void) ...@@ -1710,12 +1707,8 @@ int dlm_lowcomms_start(void)
fail_unlisten: fail_unlisten:
dlm_allow_conn = 0; dlm_allow_conn = 0;
con = nodeid2con(0,0); con = nodeid2con(0,0);
if (con) { if (con)
close_connection(con, false, true, true); free_conn(con);
kmem_cache_free(con_cache, con);
}
fail_destroy:
kmem_cache_destroy(con_cache);
fail: fail:
return error; return error;
} }
......
...@@ -22,114 +22,84 @@ ...@@ -22,114 +22,84 @@
* into packets and sends them to the comms layer. * into packets and sends them to the comms layer.
*/ */
#include <asm/unaligned.h>
#include "dlm_internal.h" #include "dlm_internal.h"
#include "lowcomms.h" #include "lowcomms.h"
#include "config.h" #include "config.h"
#include "lock.h" #include "lock.h"
#include "midcomms.h" #include "midcomms.h"
static void copy_from_cb(void *dst, const void *base, unsigned offset,
unsigned len, unsigned limit)
{
unsigned copy = len;
if ((copy + offset) > limit)
copy = limit - offset;
memcpy(dst, base + offset, copy);
len -= copy;
if (len)
memcpy(dst + copy, base, len);
}
/* /*
* Called from the low-level comms layer to process a buffer of * Called from the low-level comms layer to process a buffer of
* commands. * commands.
*
* Only complete messages are processed here, any "spare" bytes from
* the end of a buffer are saved and tacked onto the front of the next
* message that comes in. I doubt this will happen very often but we
* need to be able to cope with it and I don't want the task to be waiting
* for packets to come in when there is useful work to be done.
*/ */
int dlm_process_incoming_buffer(int nodeid, const void *base, int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
unsigned offset, unsigned len, unsigned limit)
{ {
union { const unsigned char *ptr = buf;
unsigned char __buf[DLM_INBUF_LEN]; const struct dlm_header *hd;
/* this is to force proper alignment on some arches */
union dlm_packet p;
} __tmp;
union dlm_packet *p = &__tmp.p;
int ret = 0;
int err = 0;
uint16_t msglen; uint16_t msglen;
uint32_t lockspace; int ret = 0;
while (len > sizeof(struct dlm_header)) {
/* Copy just the header to check the total length. The
message may wrap around the end of the buffer back to the
start, so we need to use a temp buffer and copy_from_cb. */
copy_from_cb(p, base, offset, sizeof(struct dlm_header),
limit);
msglen = le16_to_cpu(p->header.h_length);
lockspace = p->header.h_lockspace;
err = -EINVAL; while (len >= sizeof(struct dlm_header)) {
if (msglen < sizeof(struct dlm_header)) hd = (struct dlm_header *)ptr;
break;
if (p->header.h_cmd == DLM_MSG) { /* no message should be more than this otherwise we
if (msglen < sizeof(struct dlm_message)) * cannot deliver this message to upper layers
break; */
} else { msglen = get_unaligned_le16(&hd->h_length);
if (msglen < sizeof(struct dlm_rcom)) if (msglen > DEFAULT_BUFFER_SIZE) {
break; log_print("received invalid length header: %u, will abort message parsing",
} msglen);
err = -E2BIG; return -EBADMSG;
if (msglen > dlm_config.ci_buffer_size) {
log_print("message size %d from %d too big, buf len %d",
msglen, nodeid, len);
break;
} }
err = 0;
/* If only part of the full message is contained in this
buffer, then do nothing and wait for lowcomms to call
us again later with more data. We return 0 meaning
we've consumed none of the input buffer. */
/* caller will take care that leftover
* will be parsed next call with more data
*/
if (msglen > len) if (msglen > len)
break; break;
/* Allocate a larger temp buffer if the full message won't fit switch (hd->h_cmd) {
in the buffer on the stack (which should work for most case DLM_MSG:
ordinary messages). */ if (msglen < sizeof(struct dlm_message)) {
log_print("dlm msg too small: %u, will skip this message",
if (msglen > sizeof(__tmp) && p == &__tmp.p) { msglen);
p = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS); goto skip;
if (p == NULL) }
return ret;
}
copy_from_cb(p, base, offset, msglen, limit); break;
case DLM_RCOM:
if (msglen < sizeof(struct dlm_rcom)) {
log_print("dlm rcom msg too small: %u, will skip this message",
msglen);
goto skip;
}
BUG_ON(lockspace != p->header.h_lockspace); break;
default:
log_print("unsupported h_cmd received: %u, will skip this message",
hd->h_cmd);
goto skip;
}
/* for aligned memory access, we just copy current message
* to begin of the buffer which contains already parsed buffer
* data and should provide align access for upper layers
* because the start address of the buffer has a aligned
* address. This memmove can be removed when the upperlayer
* is capable of unaligned memory access.
*/
memmove(buf, ptr, msglen);
dlm_receive_buffer((union dlm_packet *)buf, nodeid);
skip:
ret += msglen; ret += msglen;
offset += msglen;
offset &= (limit - 1);
len -= msglen; len -= msglen;
ptr += msglen;
dlm_receive_buffer(p, nodeid);
} }
if (p != &__tmp.p) return ret;
kfree(p);
return err ? err : ret;
} }
...@@ -12,8 +12,7 @@ ...@@ -12,8 +12,7 @@
#ifndef __MIDCOMMS_DOT_H__ #ifndef __MIDCOMMS_DOT_H__
#define __MIDCOMMS_DOT_H__ #define __MIDCOMMS_DOT_H__
int dlm_process_incoming_buffer(int nodeid, const void *base, unsigned offset, int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
unsigned len, unsigned limit);
#endif /* __MIDCOMMS_DOT_H__ */ #endif /* __MIDCOMMS_DOT_H__ */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment