Commit 805de022 authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/dlm

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/dlm:
  dlm: fix length calculation in compat code
  dlm: ignore cancel on granted lock
  dlm: clear defunct cancel state
  dlm: replace idr with hash table for connections
  dlm: comment typo fixes
  dlm: use ipv6_addr_copy
  dlm: Change rwlock which is only used in write mode to a spinlock
parents 7c757eb9 1fecb1c4
......@@ -156,7 +156,7 @@ void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen
bucket = dir_hash(ls, name, namelen);
write_lock(&ls->ls_dirtbl[bucket].lock);
spin_lock(&ls->ls_dirtbl[bucket].lock);
de = search_bucket(ls, name, namelen, bucket);
......@@ -173,7 +173,7 @@ void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen
list_del(&de->list);
kfree(de);
out:
write_unlock(&ls->ls_dirtbl[bucket].lock);
spin_unlock(&ls->ls_dirtbl[bucket].lock);
}
void dlm_dir_clear(struct dlm_ls *ls)
......@@ -185,14 +185,14 @@ void dlm_dir_clear(struct dlm_ls *ls)
DLM_ASSERT(list_empty(&ls->ls_recover_list), );
for (i = 0; i < ls->ls_dirtbl_size; i++) {
write_lock(&ls->ls_dirtbl[i].lock);
spin_lock(&ls->ls_dirtbl[i].lock);
head = &ls->ls_dirtbl[i].list;
while (!list_empty(head)) {
de = list_entry(head->next, struct dlm_direntry, list);
list_del(&de->list);
put_free_de(ls, de);
}
write_unlock(&ls->ls_dirtbl[i].lock);
spin_unlock(&ls->ls_dirtbl[i].lock);
}
}
......@@ -307,17 +307,17 @@ static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
bucket = dir_hash(ls, name, namelen);
write_lock(&ls->ls_dirtbl[bucket].lock);
spin_lock(&ls->ls_dirtbl[bucket].lock);
de = search_bucket(ls, name, namelen, bucket);
if (de) {
*r_nodeid = de->master_nodeid;
write_unlock(&ls->ls_dirtbl[bucket].lock);
spin_unlock(&ls->ls_dirtbl[bucket].lock);
if (*r_nodeid == nodeid)
return -EEXIST;
return 0;
}
write_unlock(&ls->ls_dirtbl[bucket].lock);
spin_unlock(&ls->ls_dirtbl[bucket].lock);
if (namelen > DLM_RESNAME_MAXLEN)
return -EINVAL;
......@@ -330,7 +330,7 @@ static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
de->length = namelen;
memcpy(de->name, name, namelen);
write_lock(&ls->ls_dirtbl[bucket].lock);
spin_lock(&ls->ls_dirtbl[bucket].lock);
tmp = search_bucket(ls, name, namelen, bucket);
if (tmp) {
kfree(de);
......@@ -339,7 +339,7 @@ static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
}
*r_nodeid = de->master_nodeid;
write_unlock(&ls->ls_dirtbl[bucket].lock);
spin_unlock(&ls->ls_dirtbl[bucket].lock);
return 0;
}
......
......@@ -99,7 +99,7 @@ struct dlm_direntry {
struct dlm_dirtable {
struct list_head list;
rwlock_t lock;
spinlock_t lock;
};
struct dlm_rsbtable {
......
......@@ -835,7 +835,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
lkb->lkb_wait_count++;
hold_lkb(lkb);
log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
lkb->lkb_id, lkb->lkb_wait_type, mstype,
lkb->lkb_wait_count, lkb->lkb_flags);
goto out;
......@@ -851,7 +851,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
out:
if (error)
log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
log_error(ls, "addwait error %x %d flags %x %d %d %s",
lkb->lkb_id, error, lkb->lkb_flags, mstype,
lkb->lkb_wait_type, lkb->lkb_resource->res_name);
mutex_unlock(&ls->ls_waiters_mutex);
......@@ -863,23 +863,55 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
request reply on the requestqueue) between dlm_recover_waiters_pre() which
set RESEND and dlm_recover_waiters_post() */
static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
struct dlm_message *ms)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int overlap_done = 0;
if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
overlap_done = 1;
goto out_del;
}
if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
overlap_done = 1;
goto out_del;
}
/* Cancel state was preemptively cleared by a successful convert,
see next comment, nothing to do. */
if ((mstype == DLM_MSG_CANCEL_REPLY) &&
(lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
log_debug(ls, "remwait %x cancel_reply wait_type %d",
lkb->lkb_id, lkb->lkb_wait_type);
return -1;
}
/* Remove for the convert reply, and premptively remove for the
cancel reply. A convert has been granted while there's still
an outstanding cancel on it (the cancel is moot and the result
in the cancel reply should be 0). We preempt the cancel reply
because the app gets the convert result and then can follow up
with another op, like convert. This subsequent op would see the
lingering state of the cancel and fail with -EBUSY. */
if ((mstype == DLM_MSG_CONVERT_REPLY) &&
(lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
is_overlap_cancel(lkb) && ms && !ms->m_result) {
log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
lkb->lkb_id);
lkb->lkb_wait_type = 0;
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
lkb->lkb_wait_count--;
goto out_del;
}
/* N.B. type of reply may not always correspond to type of original
msg due to lookup->request optimization, verify others? */
......@@ -888,8 +920,8 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
goto out_del;
}
log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
log_error(ls, "remwait error %x reply %d flags %x no wait_type",
lkb->lkb_id, mstype, lkb->lkb_flags);
return -1;
out_del:
......@@ -899,7 +931,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
this would happen */
if (overlap_done && lkb->lkb_wait_type) {
log_error(ls, "remove_from_waiters %x reply %d give up on %d",
log_error(ls, "remwait error %x reply %d wait_type %d overlap",
lkb->lkb_id, mstype, lkb->lkb_wait_type);
lkb->lkb_wait_count--;
lkb->lkb_wait_type = 0;
......@@ -921,7 +953,7 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
int error;
mutex_lock(&ls->ls_waiters_mutex);
error = _remove_from_waiters(lkb, mstype);
error = _remove_from_waiters(lkb, mstype, NULL);
mutex_unlock(&ls->ls_waiters_mutex);
return error;
}
......@@ -936,7 +968,7 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
if (ms != &ls->ls_stub_ms)
mutex_lock(&ls->ls_waiters_mutex);
error = _remove_from_waiters(lkb, ms->m_type);
error = _remove_from_waiters(lkb, ms->m_type, ms);
if (ms != &ls->ls_stub_ms)
mutex_unlock(&ls->ls_waiters_mutex);
return error;
......@@ -2083,6 +2115,11 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
lkb->lkb_timeout_cs = args->timeout;
rv = 0;
out:
if (rv)
log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
lkb->lkb_status, lkb->lkb_wait_type,
lkb->lkb_resource->res_name);
return rv;
}
......@@ -2149,6 +2186,13 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
goto out;
}
/* there's nothing to cancel */
if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
!lkb->lkb_wait_type) {
rv = -EBUSY;
goto out;
}
switch (lkb->lkb_wait_type) {
case DLM_MSG_LOOKUP:
case DLM_MSG_REQUEST:
......
......@@ -487,7 +487,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
goto out_lkbfree;
for (i = 0; i < size; i++) {
INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
rwlock_init(&ls->ls_dirtbl[i].lock);
spin_lock_init(&ls->ls_dirtbl[i].lock);
}
INIT_LIST_HEAD(&ls->ls_waiters);
......
......@@ -2,7 +2,7 @@
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
......@@ -21,7 +21,7 @@
*
* Cluster nodes are referred to by their nodeids. nodeids are
* simply 32 bit numbers to the locking module - if they need to
* be expanded for the cluster infrastructure then that is it's
* be expanded for the cluster infrastructure then that is its
* responsibility. It is this layer's
* responsibility to resolve these into IP address or
* whatever it needs for inter-node communication.
......@@ -36,9 +36,9 @@
* of high load. Also, this way, the sending thread can collect together
* messages bound for one node and send them in one block.
*
* lowcomms will choose to use wither TCP or SCTP as its transport layer
* lowcomms will choose to use either TCP or SCTP as its transport layer
* depending on the configuration variable 'protocol'. This should be set
* to 0 (default) for TCP or 1 for SCTP. It shouldbe configured using a
* to 0 (default) for TCP or 1 for SCTP. It should be configured using a
* cluster-wide mechanism as it must be the same on all nodes of the cluster
* for the DLM to function.
*
......@@ -48,11 +48,11 @@
#include <net/sock.h>
#include <net/tcp.h>
#include <linux/pagemap.h>
#include <linux/idr.h>
#include <linux/file.h>
#include <linux/mutex.h>
#include <linux/sctp.h>
#include <net/sctp/user.h>
#include <net/ipv6.h>
#include "dlm_internal.h"
#include "lowcomms.h"
......@@ -60,6 +60,7 @@
#include "config.h"
#define NEEDED_RMEM (4*1024*1024)
#define CONN_HASH_SIZE 32
struct cbuf {
unsigned int base;
......@@ -114,6 +115,7 @@ struct connection {
int retries;
#define MAX_CONNECT_RETRIES 3
int sctp_assoc;
struct hlist_node list;
struct connection *othercon;
struct work_struct rwork; /* Receive workqueue */
struct work_struct swork; /* Send workqueue */
......@@ -138,14 +140,37 @@ static int dlm_local_count;
static struct workqueue_struct *recv_workqueue;
static struct workqueue_struct *send_workqueue;
static DEFINE_IDR(connections_idr);
static struct hlist_head connection_hash[CONN_HASH_SIZE];
static DEFINE_MUTEX(connections_lock);
static int max_nodeid;
static struct kmem_cache *con_cache;
static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work);
/* This is deliberately very simple because most clusters have simple
sequential nodeids, so we should be able to go straight to a connection
struct in the array */
static inline int nodeid_hash(int nodeid)
{
return nodeid & (CONN_HASH_SIZE-1);
}
static struct connection *__find_con(int nodeid)
{
int r;
struct hlist_node *h;
struct connection *con;
r = nodeid_hash(nodeid);
hlist_for_each_entry(con, h, &connection_hash[r], list) {
if (con->nodeid == nodeid)
return con;
}
return NULL;
}
/*
* If 'allocation' is zero then we don't attempt to create a new
* connection structure for this node.
......@@ -154,31 +179,17 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
{
struct connection *con = NULL;
int r;
int n;
con = idr_find(&connections_idr, nodeid);
con = __find_con(nodeid);
if (con || !alloc)
return con;
r = idr_pre_get(&connections_idr, alloc);
if (!r)
return NULL;
con = kmem_cache_zalloc(con_cache, alloc);
if (!con)
return NULL;
r = idr_get_new_above(&connections_idr, con, nodeid, &n);
if (r) {
kmem_cache_free(con_cache, con);
return NULL;
}
if (n != nodeid) {
idr_remove(&connections_idr, n);
kmem_cache_free(con_cache, con);
return NULL;
}
r = nodeid_hash(nodeid);
hlist_add_head(&con->list, &connection_hash[r]);
con->nodeid = nodeid;
mutex_init(&con->sock_mutex);
......@@ -189,19 +200,30 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
/* Setup action pointers for child sockets */
if (con->nodeid) {
struct connection *zerocon = idr_find(&connections_idr, 0);
struct connection *zerocon = __find_con(0);
con->connect_action = zerocon->connect_action;
if (!con->rx_action)
con->rx_action = zerocon->rx_action;
}
if (nodeid > max_nodeid)
max_nodeid = nodeid;
return con;
}
/* Loop round all connections */
static void foreach_conn(void (*conn_func)(struct connection *c))
{
int i;
struct hlist_node *h, *n;
struct connection *con;
for (i = 0; i < CONN_HASH_SIZE; i++) {
hlist_for_each_entry_safe(con, h, n, &connection_hash[i], list){
conn_func(con);
}
}
}
static struct connection *nodeid2con(int nodeid, gfp_t allocation)
{
struct connection *con;
......@@ -217,14 +239,17 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
static struct connection *assoc2con(int assoc_id)
{
int i;
struct hlist_node *h;
struct connection *con;
mutex_lock(&connections_lock);
for (i=0; i<=max_nodeid; i++) {
con = __nodeid2con(i, 0);
if (con && con->sctp_assoc == assoc_id) {
mutex_unlock(&connections_lock);
return con;
for (i = 0 ; i < CONN_HASH_SIZE; i++) {
hlist_for_each_entry(con, h, &connection_hash[i], list) {
if (con && con->sctp_assoc == assoc_id) {
mutex_unlock(&connections_lock);
return con;
}
}
}
mutex_unlock(&connections_lock);
......@@ -250,8 +275,7 @@ static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
} else {
struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
memcpy(&ret6->sin6_addr, &in6->sin6_addr,
sizeof(in6->sin6_addr));
ipv6_addr_copy(&ret6->sin6_addr, &in6->sin6_addr);
}
return 0;
......@@ -376,25 +400,23 @@ static void sctp_send_shutdown(sctp_assoc_t associd)
log_print("send EOF to node failed: %d", ret);
}
static void sctp_init_failed_foreach(struct connection *con)
{
con->sctp_assoc = 0;
if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
queue_work(send_workqueue, &con->swork);
}
}
/* INIT failed but we don't know which node...
restart INIT on all pending nodes */
static void sctp_init_failed(void)
{
int i;
struct connection *con;
mutex_lock(&connections_lock);
for (i=1; i<=max_nodeid; i++) {
con = __nodeid2con(i, 0);
if (!con)
continue;
con->sctp_assoc = 0;
if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
queue_work(send_workqueue, &con->swork);
}
}
}
foreach_conn(sctp_init_failed_foreach);
mutex_unlock(&connections_lock);
}
......@@ -1313,13 +1335,10 @@ static void send_to_sock(struct connection *con)
static void clean_one_writequeue(struct connection *con)
{
struct list_head *list;
struct list_head *temp;
struct writequeue_entry *e, *safe;
spin_lock(&con->writequeue_lock);
list_for_each_safe(list, temp, &con->writequeue) {
struct writequeue_entry *e =
list_entry(list, struct writequeue_entry, list);
list_for_each_entry_safe(e, safe, &con->writequeue, list) {
list_del(&e->list);
free_entry(e);
}
......@@ -1369,14 +1388,7 @@ static void process_send_sockets(struct work_struct *work)
/* Discard all entries on the write queues */
static void clean_writequeues(void)
{
int nodeid;
for (nodeid = 1; nodeid <= max_nodeid; nodeid++) {
struct connection *con = __nodeid2con(nodeid, 0);
if (con)
clean_one_writequeue(con);
}
foreach_conn(clean_one_writequeue);
}
static void work_stop(void)
......@@ -1406,23 +1418,29 @@ static int work_start(void)
return 0;
}
void dlm_lowcomms_stop(void)
static void stop_conn(struct connection *con)
{
int i;
struct connection *con;
con->flags |= 0x0F;
if (con->sock)
con->sock->sk->sk_user_data = NULL;
}
static void free_conn(struct connection *con)
{
close_connection(con, true);
if (con->othercon)
kmem_cache_free(con_cache, con->othercon);
hlist_del(&con->list);
kmem_cache_free(con_cache, con);
}
void dlm_lowcomms_stop(void)
{
/* Set all the flags to prevent any
socket activity.
*/
mutex_lock(&connections_lock);
for (i = 0; i <= max_nodeid; i++) {
con = __nodeid2con(i, 0);
if (con) {
con->flags |= 0x0F;
if (con->sock)
con->sock->sk->sk_user_data = NULL;
}
}
foreach_conn(stop_conn);
mutex_unlock(&connections_lock);
work_stop();
......@@ -1430,25 +1448,20 @@ void dlm_lowcomms_stop(void)
mutex_lock(&connections_lock);
clean_writequeues();
for (i = 0; i <= max_nodeid; i++) {
con = __nodeid2con(i, 0);
if (con) {
close_connection(con, true);
if (con->othercon)
kmem_cache_free(con_cache, con->othercon);
kmem_cache_free(con_cache, con);
}
}
max_nodeid = 0;
foreach_conn(free_conn);
mutex_unlock(&connections_lock);
kmem_cache_destroy(con_cache);
idr_init(&connections_idr);
}
int dlm_lowcomms_start(void)
{
int error = -EINVAL;
struct connection *con;
int i;
for (i = 0; i < CONN_HASH_SIZE; i++)
INIT_HLIST_HEAD(&connection_hash[i]);
init_local();
if (!dlm_local_count) {
......
/*
* Copyright (C) 2006-2008 Red Hat, Inc. All rights reserved.
* Copyright (C) 2006-2009 Red Hat, Inc. All rights reserved.
*
* This copyrighted material is made available to anyone wishing to use,
* modify, copy, or redistribute it subject to the terms and conditions
......@@ -84,7 +84,7 @@ struct dlm_lock_result32 {
static void compat_input(struct dlm_write_request *kb,
struct dlm_write_request32 *kb32,
size_t count)
int namelen)
{
kb->version[0] = kb32->version[0];
kb->version[1] = kb32->version[1];
......@@ -96,8 +96,7 @@ static void compat_input(struct dlm_write_request *kb,
kb->cmd == DLM_USER_REMOVE_LOCKSPACE) {
kb->i.lspace.flags = kb32->i.lspace.flags;
kb->i.lspace.minor = kb32->i.lspace.minor;
memcpy(kb->i.lspace.name, kb32->i.lspace.name, count -
offsetof(struct dlm_write_request32, i.lspace.name));
memcpy(kb->i.lspace.name, kb32->i.lspace.name, namelen);
} else if (kb->cmd == DLM_USER_PURGE) {
kb->i.purge.nodeid = kb32->i.purge.nodeid;
kb->i.purge.pid = kb32->i.purge.pid;
......@@ -115,8 +114,7 @@ static void compat_input(struct dlm_write_request *kb,
kb->i.lock.bastaddr = (void *)(long)kb32->i.lock.bastaddr;
kb->i.lock.lksb = (void *)(long)kb32->i.lock.lksb;
memcpy(kb->i.lock.lvb, kb32->i.lock.lvb, DLM_USER_LVB_LEN);
memcpy(kb->i.lock.name, kb32->i.lock.name, count -
offsetof(struct dlm_write_request32, i.lock.name));
memcpy(kb->i.lock.name, kb32->i.lock.name, namelen);
}
}
......@@ -539,9 +537,16 @@ static ssize_t device_write(struct file *file, const char __user *buf,
#ifdef CONFIG_COMPAT
if (!kbuf->is64bit) {
struct dlm_write_request32 *k32buf;
int namelen = 0;
if (count > sizeof(struct dlm_write_request32))
namelen = count - sizeof(struct dlm_write_request32);
k32buf = (struct dlm_write_request32 *)kbuf;
kbuf = kmalloc(count + 1 + (sizeof(struct dlm_write_request) -
sizeof(struct dlm_write_request32)), GFP_KERNEL);
/* add 1 after namelen so that the name string is terminated */
kbuf = kzalloc(sizeof(struct dlm_write_request) + namelen + 1,
GFP_KERNEL);
if (!kbuf) {
kfree(k32buf);
return -ENOMEM;
......@@ -549,7 +554,8 @@ static ssize_t device_write(struct file *file, const char __user *buf,
if (proc)
set_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags);
compat_input(kbuf, k32buf, count + 1);
compat_input(kbuf, k32buf, namelen);
kfree(k32buf);
}
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment