Commit 6bab076a authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'dlm-5.13' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:
 "This includes more dlm networking cleanups and improvements for making
  dlm shutdowns more robust"

* tag 'dlm-5.13' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  fs: dlm: fix missing unlock on error in accept_from_sock()
  fs: dlm: add shutdown hook
  fs: dlm: flush swork on shutdown
  fs: dlm: remove unaligned memory access handling
  fs: dlm: check on minimum msglen size
  fs: dlm: simplify writequeue handling
  fs: dlm: use GFP_ZERO for page buffer
  fs: dlm: change allocation limits
  fs: dlm: add check if dlm is currently running
  fs: dlm: add errno handling to check callback
  fs: dlm: set subclass for othercon sock_mutex
  fs: dlm: set connected bit after accept
  fs: dlm: fix mark setting deadlock
  fs: dlm: fix debugfs dump
parents 9ec1efbf 2fd8db2d
...@@ -125,7 +125,7 @@ static ssize_t cluster_cluster_name_store(struct config_item *item, ...@@ -125,7 +125,7 @@ static ssize_t cluster_cluster_name_store(struct config_item *item,
CONFIGFS_ATTR(cluster_, cluster_name); CONFIGFS_ATTR(cluster_, cluster_name);
static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field,
int *info_field, bool (*check_cb)(unsigned int x), int *info_field, int (*check_cb)(unsigned int x),
const char *buf, size_t len) const char *buf, size_t len)
{ {
unsigned int x; unsigned int x;
...@@ -137,8 +137,11 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, ...@@ -137,8 +137,11 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field,
if (rc) if (rc)
return rc; return rc;
if (check_cb && check_cb(x)) if (check_cb) {
return -EINVAL; rc = check_cb(x);
if (rc)
return rc;
}
*cl_field = x; *cl_field = x;
*info_field = x; *info_field = x;
...@@ -161,17 +164,53 @@ static ssize_t cluster_##name##_show(struct config_item *item, char *buf) \ ...@@ -161,17 +164,53 @@ static ssize_t cluster_##name##_show(struct config_item *item, char *buf) \
} \ } \
CONFIGFS_ATTR(cluster_, name); CONFIGFS_ATTR(cluster_, name);
static bool dlm_check_zero(unsigned int x) static int dlm_check_protocol_and_dlm_running(unsigned int x)
{ {
return !x; switch (x) {
case 0:
/* TCP */
break;
case 1:
/* SCTP */
break;
default:
return -EINVAL;
}
if (dlm_allow_conn)
return -EBUSY;
return 0;
} }
static bool dlm_check_buffer_size(unsigned int x) static int dlm_check_zero_and_dlm_running(unsigned int x)
{ {
return (x < DEFAULT_BUFFER_SIZE); if (!x)
return -EINVAL;
if (dlm_allow_conn)
return -EBUSY;
return 0;
} }
CLUSTER_ATTR(tcp_port, dlm_check_zero); static int dlm_check_zero(unsigned int x)
{
if (!x)
return -EINVAL;
return 0;
}
static int dlm_check_buffer_size(unsigned int x)
{
if (x < DEFAULT_BUFFER_SIZE)
return -EINVAL;
return 0;
}
CLUSTER_ATTR(tcp_port, dlm_check_zero_and_dlm_running);
CLUSTER_ATTR(buffer_size, dlm_check_buffer_size); CLUSTER_ATTR(buffer_size, dlm_check_buffer_size);
CLUSTER_ATTR(rsbtbl_size, dlm_check_zero); CLUSTER_ATTR(rsbtbl_size, dlm_check_zero);
CLUSTER_ATTR(recover_timer, dlm_check_zero); CLUSTER_ATTR(recover_timer, dlm_check_zero);
...@@ -179,7 +218,7 @@ CLUSTER_ATTR(toss_secs, dlm_check_zero); ...@@ -179,7 +218,7 @@ CLUSTER_ATTR(toss_secs, dlm_check_zero);
CLUSTER_ATTR(scan_secs, dlm_check_zero); CLUSTER_ATTR(scan_secs, dlm_check_zero);
CLUSTER_ATTR(log_debug, NULL); CLUSTER_ATTR(log_debug, NULL);
CLUSTER_ATTR(log_info, NULL); CLUSTER_ATTR(log_info, NULL);
CLUSTER_ATTR(protocol, NULL); CLUSTER_ATTR(protocol, dlm_check_protocol_and_dlm_running);
CLUSTER_ATTR(mark, NULL); CLUSTER_ATTR(mark, NULL);
CLUSTER_ATTR(timewarn_cs, dlm_check_zero); CLUSTER_ATTR(timewarn_cs, dlm_check_zero);
CLUSTER_ATTR(waitwarn_us, NULL); CLUSTER_ATTR(waitwarn_us, NULL);
...@@ -688,6 +727,7 @@ static ssize_t comm_mark_show(struct config_item *item, char *buf) ...@@ -688,6 +727,7 @@ static ssize_t comm_mark_show(struct config_item *item, char *buf)
static ssize_t comm_mark_store(struct config_item *item, const char *buf, static ssize_t comm_mark_store(struct config_item *item, const char *buf,
size_t len) size_t len)
{ {
struct dlm_comm *comm;
unsigned int mark; unsigned int mark;
int rc; int rc;
...@@ -695,7 +735,15 @@ static ssize_t comm_mark_store(struct config_item *item, const char *buf, ...@@ -695,7 +735,15 @@ static ssize_t comm_mark_store(struct config_item *item, const char *buf,
if (rc) if (rc)
return rc; return rc;
config_item_to_comm(item)->mark = mark; if (mark == 0)
mark = dlm_config.ci_mark;
comm = config_item_to_comm(item);
rc = dlm_lowcomms_nodes_set_mark(comm->nodeid, mark);
if (rc)
return rc;
comm->mark = mark;
return len; return len;
} }
...@@ -870,24 +918,6 @@ int dlm_comm_seq(int nodeid, uint32_t *seq) ...@@ -870,24 +918,6 @@ int dlm_comm_seq(int nodeid, uint32_t *seq)
return 0; return 0;
} }
void dlm_comm_mark(int nodeid, unsigned int *mark)
{
struct dlm_comm *cm;
cm = get_comm(nodeid);
if (!cm) {
*mark = dlm_config.ci_mark;
return;
}
if (cm->mark)
*mark = cm->mark;
else
*mark = dlm_config.ci_mark;
put_comm(cm);
}
int dlm_our_nodeid(void) int dlm_our_nodeid(void)
{ {
return local_comm ? local_comm->nodeid : 0; return local_comm ? local_comm->nodeid : 0;
......
...@@ -48,7 +48,6 @@ void dlm_config_exit(void); ...@@ -48,7 +48,6 @@ void dlm_config_exit(void);
int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out, int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
int *count_out); int *count_out);
int dlm_comm_seq(int nodeid, uint32_t *seq); int dlm_comm_seq(int nodeid, uint32_t *seq);
void dlm_comm_mark(int nodeid, unsigned int *mark);
int dlm_our_nodeid(void); int dlm_our_nodeid(void);
int dlm_our_addr(struct sockaddr_storage *addr, int num); int dlm_our_addr(struct sockaddr_storage *addr, int num);
......
...@@ -542,6 +542,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos) ...@@ -542,6 +542,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
if (bucket >= ls->ls_rsbtbl_size) { if (bucket >= ls->ls_rsbtbl_size) {
kfree(ri); kfree(ri);
++*pos;
return NULL; return NULL;
} }
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep; tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
......
...@@ -3541,8 +3541,6 @@ static int _create_message(struct dlm_ls *ls, int mb_len, ...@@ -3541,8 +3541,6 @@ static int _create_message(struct dlm_ls *ls, int mb_len,
if (!mh) if (!mh)
return -ENOBUFS; return -ENOBUFS;
memset(mb, 0, mb_len);
ms = (struct dlm_message *) mb; ms = (struct dlm_message *) mb;
ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
......
...@@ -404,12 +404,6 @@ static int threads_start(void) ...@@ -404,12 +404,6 @@ static int threads_start(void)
return error; return error;
} }
static void threads_stop(void)
{
dlm_scand_stop();
dlm_lowcomms_stop();
}
static int new_lockspace(const char *name, const char *cluster, static int new_lockspace(const char *name, const char *cluster,
uint32_t flags, int lvblen, uint32_t flags, int lvblen,
const struct dlm_lockspace_ops *ops, void *ops_arg, const struct dlm_lockspace_ops *ops, void *ops_arg,
...@@ -702,8 +696,11 @@ int dlm_new_lockspace(const char *name, const char *cluster, ...@@ -702,8 +696,11 @@ int dlm_new_lockspace(const char *name, const char *cluster,
ls_count++; ls_count++;
if (error > 0) if (error > 0)
error = 0; error = 0;
if (!ls_count) if (!ls_count) {
threads_stop(); dlm_scand_stop();
dlm_lowcomms_shutdown();
dlm_lowcomms_stop();
}
out: out:
mutex_unlock(&ls_lock); mutex_unlock(&ls_lock);
return error; return error;
...@@ -788,6 +785,11 @@ static int release_lockspace(struct dlm_ls *ls, int force) ...@@ -788,6 +785,11 @@ static int release_lockspace(struct dlm_ls *ls, int force)
dlm_recoverd_stop(ls); dlm_recoverd_stop(ls);
if (ls_count == 1) {
dlm_scand_stop();
dlm_lowcomms_shutdown();
}
dlm_callback_stop(ls); dlm_callback_stop(ls);
remove_lockspace(ls); remove_lockspace(ls);
...@@ -880,7 +882,7 @@ int dlm_release_lockspace(void *lockspace, int force) ...@@ -880,7 +882,7 @@ int dlm_release_lockspace(void *lockspace, int force)
if (!error) if (!error)
ls_count--; ls_count--;
if (!ls_count) if (!ls_count)
threads_stop(); dlm_lowcomms_stop();
mutex_unlock(&ls_lock); mutex_unlock(&ls_lock);
return error; return error;
......
...@@ -102,6 +102,9 @@ struct listen_connection { ...@@ -102,6 +102,9 @@ struct listen_connection {
struct work_struct rwork; struct work_struct rwork;
}; };
#define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
#define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
/* An entry waiting to be sent */ /* An entry waiting to be sent */
struct writequeue_entry { struct writequeue_entry {
struct list_head list; struct list_head list;
...@@ -116,6 +119,7 @@ struct writequeue_entry { ...@@ -116,6 +119,7 @@ struct writequeue_entry {
struct dlm_node_addr { struct dlm_node_addr {
struct list_head list; struct list_head list;
int nodeid; int nodeid;
int mark;
int addr_count; int addr_count;
int curr_addr_index; int curr_addr_index;
struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
...@@ -134,7 +138,7 @@ static DEFINE_SPINLOCK(dlm_node_addrs_spin); ...@@ -134,7 +138,7 @@ static DEFINE_SPINLOCK(dlm_node_addrs_spin);
static struct listen_connection listen_con; static struct listen_connection listen_con;
static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
static int dlm_local_count; static int dlm_local_count;
static int dlm_allow_conn; int dlm_allow_conn;
/* Work queues */ /* Work queues */
static struct workqueue_struct *recv_workqueue; static struct workqueue_struct *recv_workqueue;
...@@ -303,7 +307,8 @@ static int addr_compare(const struct sockaddr_storage *x, ...@@ -303,7 +307,8 @@ static int addr_compare(const struct sockaddr_storage *x,
} }
static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
struct sockaddr *sa_out, bool try_new_addr) struct sockaddr *sa_out, bool try_new_addr,
unsigned int *mark)
{ {
struct sockaddr_storage sas; struct sockaddr_storage sas;
struct dlm_node_addr *na; struct dlm_node_addr *na;
...@@ -331,6 +336,8 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, ...@@ -331,6 +336,8 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
if (!na->addr_count) if (!na->addr_count)
return -ENOENT; return -ENOENT;
*mark = na->mark;
if (sas_out) if (sas_out)
memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
...@@ -350,7 +357,8 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, ...@@ -350,7 +357,8 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
return 0; return 0;
} }
static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
unsigned int *mark)
{ {
struct dlm_node_addr *na; struct dlm_node_addr *na;
int rv = -EEXIST; int rv = -EEXIST;
...@@ -364,6 +372,7 @@ static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) ...@@ -364,6 +372,7 @@ static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
for (addr_i = 0; addr_i < na->addr_count; addr_i++) { for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
if (addr_compare(na->addr[addr_i], addr)) { if (addr_compare(na->addr[addr_i], addr)) {
*nodeid = na->nodeid; *nodeid = na->nodeid;
*mark = na->mark;
rv = 0; rv = 0;
goto unlock; goto unlock;
} }
...@@ -412,6 +421,7 @@ int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) ...@@ -412,6 +421,7 @@ int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
new_node->nodeid = nodeid; new_node->nodeid = nodeid;
new_node->addr[0] = new_addr; new_node->addr[0] = new_addr;
new_node->addr_count = 1; new_node->addr_count = 1;
new_node->mark = dlm_config.ci_mark;
list_add(&new_node->list, &dlm_node_addrs); list_add(&new_node->list, &dlm_node_addrs);
spin_unlock(&dlm_node_addrs_spin); spin_unlock(&dlm_node_addrs_spin);
return 0; return 0;
...@@ -519,6 +529,23 @@ int dlm_lowcomms_connect_node(int nodeid) ...@@ -519,6 +529,23 @@ int dlm_lowcomms_connect_node(int nodeid)
return 0; return 0;
} }
int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
{
struct dlm_node_addr *na;
spin_lock(&dlm_node_addrs_spin);
na = find_node_addr(nodeid);
if (!na) {
spin_unlock(&dlm_node_addrs_spin);
return -ENOENT;
}
na->mark = mark;
spin_unlock(&dlm_node_addrs_spin);
return 0;
}
static void lowcomms_error_report(struct sock *sk) static void lowcomms_error_report(struct sock *sk)
{ {
struct connection *con; struct connection *con;
...@@ -685,10 +712,7 @@ static void shutdown_connection(struct connection *con) ...@@ -685,10 +712,7 @@ static void shutdown_connection(struct connection *con)
{ {
int ret; int ret;
if (cancel_work_sync(&con->swork)) { flush_work(&con->swork);
log_print("canceled swork for node %d", con->nodeid);
clear_bit(CF_WRITE_PENDING, &con->flags);
}
mutex_lock(&con->sock_mutex); mutex_lock(&con->sock_mutex);
/* nothing to shutdown */ /* nothing to shutdown */
...@@ -867,7 +891,7 @@ static int accept_from_sock(struct listen_connection *con) ...@@ -867,7 +891,7 @@ static int accept_from_sock(struct listen_connection *con)
/* Get the new node's NODEID */ /* Get the new node's NODEID */
make_sockaddr(&peeraddr, 0, &len); make_sockaddr(&peeraddr, 0, &len);
if (addr_to_nodeid(&peeraddr, &nodeid)) { if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) {
unsigned char *b=(unsigned char *)&peeraddr; unsigned char *b=(unsigned char *)&peeraddr;
log_print("connect from non cluster node"); log_print("connect from non cluster node");
print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
...@@ -876,9 +900,6 @@ static int accept_from_sock(struct listen_connection *con) ...@@ -876,9 +900,6 @@ static int accept_from_sock(struct listen_connection *con)
return -1; return -1;
} }
dlm_comm_mark(nodeid, &mark);
sock_set_mark(newsock->sk, mark);
log_print("got connection from %d", nodeid); log_print("got connection from %d", nodeid);
/* Check to see if we already have a connection to this node. This /* Check to see if we already have a connection to this node. This
...@@ -892,6 +913,8 @@ static int accept_from_sock(struct listen_connection *con) ...@@ -892,6 +913,8 @@ static int accept_from_sock(struct listen_connection *con)
goto accept_err; goto accept_err;
} }
sock_set_mark(newsock->sk, mark);
mutex_lock(&newcon->sock_mutex); mutex_lock(&newcon->sock_mutex);
if (newcon->sock) { if (newcon->sock) {
struct connection *othercon = newcon->othercon; struct connection *othercon = newcon->othercon;
...@@ -908,16 +931,18 @@ static int accept_from_sock(struct listen_connection *con) ...@@ -908,16 +931,18 @@ static int accept_from_sock(struct listen_connection *con)
result = dlm_con_init(othercon, nodeid); result = dlm_con_init(othercon, nodeid);
if (result < 0) { if (result < 0) {
kfree(othercon); kfree(othercon);
mutex_unlock(&newcon->sock_mutex);
goto accept_err; goto accept_err;
} }
lockdep_set_subclass(&othercon->sock_mutex, 1);
newcon->othercon = othercon; newcon->othercon = othercon;
} else { } else {
/* close other sock con if we have something new */ /* close other sock con if we have something new */
close_connection(othercon, false, true, false); close_connection(othercon, false, true, false);
} }
mutex_lock_nested(&othercon->sock_mutex, 1); mutex_lock(&othercon->sock_mutex);
add_sock(newsock, othercon); add_sock(newsock, othercon);
addcon = othercon; addcon = othercon;
mutex_unlock(&othercon->sock_mutex); mutex_unlock(&othercon->sock_mutex);
...@@ -930,6 +955,7 @@ static int accept_from_sock(struct listen_connection *con) ...@@ -930,6 +955,7 @@ static int accept_from_sock(struct listen_connection *con)
addcon = newcon; addcon = newcon;
} }
set_bit(CF_CONNECTED, &addcon->flags);
mutex_unlock(&newcon->sock_mutex); mutex_unlock(&newcon->sock_mutex);
/* /*
...@@ -1015,8 +1041,6 @@ static void sctp_connect_to_sock(struct connection *con) ...@@ -1015,8 +1041,6 @@ static void sctp_connect_to_sock(struct connection *con)
struct socket *sock; struct socket *sock;
unsigned int mark; unsigned int mark;
dlm_comm_mark(con->nodeid, &mark);
mutex_lock(&con->sock_mutex); mutex_lock(&con->sock_mutex);
/* Some odd races can cause double-connects, ignore them */ /* Some odd races can cause double-connects, ignore them */
...@@ -1029,7 +1053,7 @@ static void sctp_connect_to_sock(struct connection *con) ...@@ -1029,7 +1053,7 @@ static void sctp_connect_to_sock(struct connection *con)
} }
memset(&daddr, 0, sizeof(daddr)); memset(&daddr, 0, sizeof(daddr));
result = nodeid_to_addr(con->nodeid, &daddr, NULL, true); result = nodeid_to_addr(con->nodeid, &daddr, NULL, true, &mark);
if (result < 0) { if (result < 0) {
log_print("no address for nodeid %d", con->nodeid); log_print("no address for nodeid %d", con->nodeid);
goto out; goto out;
...@@ -1104,13 +1128,11 @@ static void sctp_connect_to_sock(struct connection *con) ...@@ -1104,13 +1128,11 @@ static void sctp_connect_to_sock(struct connection *con)
static void tcp_connect_to_sock(struct connection *con) static void tcp_connect_to_sock(struct connection *con)
{ {
struct sockaddr_storage saddr, src_addr; struct sockaddr_storage saddr, src_addr;
unsigned int mark;
int addr_len; int addr_len;
struct socket *sock = NULL; struct socket *sock = NULL;
unsigned int mark;
int result; int result;
dlm_comm_mark(con->nodeid, &mark);
mutex_lock(&con->sock_mutex); mutex_lock(&con->sock_mutex);
if (con->retries++ > MAX_CONNECT_RETRIES) if (con->retries++ > MAX_CONNECT_RETRIES)
goto out; goto out;
...@@ -1125,15 +1147,15 @@ static void tcp_connect_to_sock(struct connection *con) ...@@ -1125,15 +1147,15 @@ static void tcp_connect_to_sock(struct connection *con)
if (result < 0) if (result < 0)
goto out_err; goto out_err;
sock_set_mark(sock->sk, mark);
memset(&saddr, 0, sizeof(saddr)); memset(&saddr, 0, sizeof(saddr));
result = nodeid_to_addr(con->nodeid, &saddr, NULL, false); result = nodeid_to_addr(con->nodeid, &saddr, NULL, false, &mark);
if (result < 0) { if (result < 0) {
log_print("no address for nodeid %d", con->nodeid); log_print("no address for nodeid %d", con->nodeid);
goto out_err; goto out_err;
} }
sock_set_mark(sock->sk, mark);
add_sock(sock, con); add_sock(sock, con);
/* Bind to our cluster-known address connecting to avoid /* Bind to our cluster-known address connecting to avoid
...@@ -1330,70 +1352,72 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con, ...@@ -1330,70 +1352,72 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con,
{ {
struct writequeue_entry *entry; struct writequeue_entry *entry;
entry = kmalloc(sizeof(struct writequeue_entry), allocation); entry = kzalloc(sizeof(*entry), allocation);
if (!entry) if (!entry)
return NULL; return NULL;
entry->page = alloc_page(allocation); entry->page = alloc_page(allocation | __GFP_ZERO);
if (!entry->page) { if (!entry->page) {
kfree(entry); kfree(entry);
return NULL; return NULL;
} }
entry->offset = 0;
entry->len = 0;
entry->end = 0;
entry->users = 0;
entry->con = con; entry->con = con;
entry->users = 1;
return entry; return entry;
} }
void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
gfp_t allocation, char **ppc)
{ {
struct connection *con;
struct writequeue_entry *e; struct writequeue_entry *e;
int offset = 0;
if (len > LOWCOMMS_MAX_TX_BUFFER_LEN) {
BUILD_BUG_ON(PAGE_SIZE < LOWCOMMS_MAX_TX_BUFFER_LEN);
log_print("failed to allocate a buffer of size %d", len);
return NULL;
}
con = nodeid2con(nodeid, allocation);
if (!con)
return NULL;
spin_lock(&con->writequeue_lock); spin_lock(&con->writequeue_lock);
e = list_entry(con->writequeue.prev, struct writequeue_entry, list); if (!list_empty(&con->writequeue)) {
if ((&e->list == &con->writequeue) || e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
(PAGE_SIZE - e->end < len)) { if (DLM_WQ_REMAIN_BYTES(e) >= len) {
e = NULL; *ppc = page_address(e->page) + e->end;
} else {
offset = e->end;
e->end += len; e->end += len;
e->users++; e->users++;
}
spin_unlock(&con->writequeue_lock); spin_unlock(&con->writequeue_lock);
if (e) {
got_one:
*ppc = page_address(e->page) + offset;
return e; return e;
} }
}
spin_unlock(&con->writequeue_lock);
e = new_writequeue_entry(con, allocation); e = new_writequeue_entry(con, allocation);
if (e) { if (!e)
spin_lock(&con->writequeue_lock); return NULL;
offset = e->end;
*ppc = page_address(e->page);
e->end += len; e->end += len;
e->users++;
spin_lock(&con->writequeue_lock);
list_add_tail(&e->list, &con->writequeue); list_add_tail(&e->list, &con->writequeue);
spin_unlock(&con->writequeue_lock); spin_unlock(&con->writequeue_lock);
goto got_one;
return e;
};
void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
{
struct connection *con;
if (len > DEFAULT_BUFFER_SIZE ||
len < sizeof(struct dlm_header)) {
BUILD_BUG_ON(PAGE_SIZE < DEFAULT_BUFFER_SIZE);
log_print("failed to allocate a buffer of size %d", len);
WARN_ON(1);
return NULL;
} }
con = nodeid2con(nodeid, allocation);
if (!con)
return NULL; return NULL;
return new_wq_entry(con, len, allocation, ppc);
} }
void dlm_lowcomms_commit_buffer(void *mh) void dlm_lowcomms_commit_buffer(void *mh)
...@@ -1406,7 +1430,8 @@ void dlm_lowcomms_commit_buffer(void *mh) ...@@ -1406,7 +1430,8 @@ void dlm_lowcomms_commit_buffer(void *mh)
users = --e->users; users = --e->users;
if (users) if (users)
goto out; goto out;
e->len = e->end - e->offset;
e->len = DLM_WQ_LENGTH_BYTES(e);
spin_unlock(&con->writequeue_lock); spin_unlock(&con->writequeue_lock);
queue_work(send_workqueue, &con->swork); queue_work(send_workqueue, &con->swork);
...@@ -1432,11 +1457,10 @@ static void send_to_sock(struct connection *con) ...@@ -1432,11 +1457,10 @@ static void send_to_sock(struct connection *con)
spin_lock(&con->writequeue_lock); spin_lock(&con->writequeue_lock);
for (;;) { for (;;) {
e = list_entry(con->writequeue.next, struct writequeue_entry, if (list_empty(&con->writequeue))
list);
if ((struct list_head *) e == &con->writequeue)
break; break;
e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
len = e->len; len = e->len;
offset = e->offset; offset = e->offset;
BUG_ON(len == 0 && e->users == 0); BUG_ON(len == 0 && e->users == 0);
...@@ -1589,6 +1613,29 @@ static int work_start(void) ...@@ -1589,6 +1613,29 @@ static int work_start(void)
return 0; return 0;
} }
static void shutdown_conn(struct connection *con)
{
if (con->shutdown_action)
con->shutdown_action(con);
}
void dlm_lowcomms_shutdown(void)
{
/* Set all the flags to prevent any
* socket activity.
*/
dlm_allow_conn = 0;
if (recv_workqueue)
flush_workqueue(recv_workqueue);
if (send_workqueue)
flush_workqueue(send_workqueue);
dlm_close_sock(&listen_con.sock);
foreach_conn(shutdown_conn);
}
static void _stop_conn(struct connection *con, bool and_other) static void _stop_conn(struct connection *con, bool and_other)
{ {
mutex_lock(&con->sock_mutex); mutex_lock(&con->sock_mutex);
...@@ -1610,12 +1657,6 @@ static void stop_conn(struct connection *con) ...@@ -1610,12 +1657,6 @@ static void stop_conn(struct connection *con)
_stop_conn(con, true); _stop_conn(con, true);
} }
static void shutdown_conn(struct connection *con)
{
if (con->shutdown_action)
con->shutdown_action(con);
}
static void connection_release(struct rcu_head *rcu) static void connection_release(struct rcu_head *rcu)
{ {
struct connection *con = container_of(rcu, struct connection, rcu); struct connection *con = container_of(rcu, struct connection, rcu);
...@@ -1672,19 +1713,6 @@ static void work_flush(void) ...@@ -1672,19 +1713,6 @@ static void work_flush(void)
void dlm_lowcomms_stop(void) void dlm_lowcomms_stop(void)
{ {
/* Set all the flags to prevent any
socket activity.
*/
dlm_allow_conn = 0;
if (recv_workqueue)
flush_workqueue(recv_workqueue);
if (send_workqueue)
flush_workqueue(send_workqueue);
dlm_close_sock(&listen_con.sock);
foreach_conn(shutdown_conn);
work_flush(); work_flush();
foreach_conn(free_conn); foreach_conn(free_conn);
work_stop(); work_stop();
......
...@@ -14,13 +14,18 @@ ...@@ -14,13 +14,18 @@
#define LOWCOMMS_MAX_TX_BUFFER_LEN 4096 #define LOWCOMMS_MAX_TX_BUFFER_LEN 4096
/* switch to check if dlm is running */
extern int dlm_allow_conn;
int dlm_lowcomms_start(void); int dlm_lowcomms_start(void);
void dlm_lowcomms_shutdown(void);
void dlm_lowcomms_stop(void); void dlm_lowcomms_stop(void);
void dlm_lowcomms_exit(void); void dlm_lowcomms_exit(void);
int dlm_lowcomms_close(int nodeid); int dlm_lowcomms_close(int nodeid);
void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc); void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc);
void dlm_lowcomms_commit_buffer(void *mh); void dlm_lowcomms_commit_buffer(void *mh);
int dlm_lowcomms_connect_node(int nodeid); int dlm_lowcomms_connect_node(int nodeid);
int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark);
int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
#endif /* __LOWCOMMS_DOT_H__ */ #endif /* __LOWCOMMS_DOT_H__ */
......
...@@ -22,8 +22,6 @@ ...@@ -22,8 +22,6 @@
* into packets and sends them to the comms layer. * into packets and sends them to the comms layer.
*/ */
#include <asm/unaligned.h>
#include "dlm_internal.h" #include "dlm_internal.h"
#include "lowcomms.h" #include "lowcomms.h"
#include "config.h" #include "config.h"
...@@ -45,13 +43,22 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len) ...@@ -45,13 +43,22 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
while (len >= sizeof(struct dlm_header)) { while (len >= sizeof(struct dlm_header)) {
hd = (struct dlm_header *)ptr; hd = (struct dlm_header *)ptr;
/* no message should be more than this otherwise we /* no message should be more than DEFAULT_BUFFER_SIZE or
* cannot deliver this message to upper layers * less than dlm_header size.
*
* Some messages does not have a 8 byte length boundary yet
* which can occur in a unaligned memory access of some dlm
* messages. However this problem need to be fixed at the
* sending side, for now it seems nobody run into architecture
* related issues yet but it slows down some processing.
* Fixing this issue should be scheduled in future by doing
* the next major version bump.
*/ */
msglen = get_unaligned_le16(&hd->h_length); msglen = le16_to_cpu(hd->h_length);
if (msglen > DEFAULT_BUFFER_SIZE) { if (msglen > DEFAULT_BUFFER_SIZE ||
log_print("received invalid length header: %u, will abort message parsing", msglen < sizeof(struct dlm_header)) {
msglen); log_print("received invalid length header: %u from node %d, will abort message parsing",
msglen, nodeid);
return -EBADMSG; return -EBADMSG;
} }
...@@ -84,15 +91,7 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len) ...@@ -84,15 +91,7 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
goto skip; goto skip;
} }
/* for aligned memory access, we just copy current message dlm_receive_buffer((union dlm_packet *)ptr, nodeid);
* to begin of the buffer which contains already parsed buffer
* data and should provide align access for upper layers
* because the start address of the buffer has a aligned
* address. This memmove can be removed when the upperlayer
* is capable of unaligned memory access.
*/
memmove(buf, ptr, msglen);
dlm_receive_buffer((union dlm_packet *)buf, nodeid);
skip: skip:
ret += msglen; ret += msglen;
......
...@@ -41,7 +41,6 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, ...@@ -41,7 +41,6 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
to_nodeid, type, len); to_nodeid, type, len);
return -ENOBUFS; return -ENOBUFS;
} }
memset(mb, 0, mb_len);
rc = (struct dlm_rcom *) mb; rc = (struct dlm_rcom *) mb;
...@@ -462,7 +461,6 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) ...@@ -462,7 +461,6 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb); mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
if (!mh) if (!mh)
return -ENOBUFS; return -ENOBUFS;
memset(mb, 0, mb_len);
rc = (struct dlm_rcom *) mb; rc = (struct dlm_rcom *) mb;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment