Commit 3f67eaed authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'dlm-5.17' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:
 "This set includes the normal collection of minor fixes and cleanups,
  new kmem caches for network messaging structs, a start on some basic
  tracepoints, and some new debugfs files for inserting test messages"

* tag 'dlm-5.17' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm: (32 commits)
  fs: dlm: print cluster addr if non-cluster node connects
  fs: dlm: memory cache for lowcomms hotpath
  fs: dlm: memory cache for writequeue_entry
  fs: dlm: memory cache for midcomms hotpath
  fs: dlm: remove wq_alloc mutex
  fs: dlm: use event based wait for pending remove
  fs: dlm: check for pending users filling buffers
  fs: dlm: use list_empty() to check last iteration
  fs: dlm: fix build with CONFIG_IPV6 disabled
  fs: dlm: replace use of socket sk_callback_lock with sock_lock
  fs: dlm: don't call kernel_getpeername() in error_report()
  fs: dlm: fix potential buffer overflow
  fs: dlm:Remove unneeded semicolon
  fs: dlm: remove double list_first_entry call
  fs: dlm: filter user dlm messages for kernel locks
  fs: dlm: add lkb waiters debugfs functionality
  fs: dlm: add lkb debugfs functionality
  fs: dlm: allow create lkb with specific id range
  fs: dlm: add debugfs rawmsg send functionality
  fs: dlm: let handle callback data as void
  ...
parents 8481c323 feae43f8
......@@ -9,6 +9,8 @@
*******************************************************************************
******************************************************************************/
#include <trace/events/dlm.h>
#include "dlm_internal.h"
#include "lock.h"
#include "user.h"
......@@ -254,10 +256,12 @@ void dlm_callback_work(struct work_struct *work)
continue;
} else if (callbacks[i].flags & DLM_CB_BAST) {
bastfn(lkb->lkb_astparam, callbacks[i].mode);
trace_dlm_bast(ls, lkb, callbacks[i].mode);
} else if (callbacks[i].flags & DLM_CB_CAST) {
lkb->lkb_lksb->sb_status = callbacks[i].sb_status;
lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags;
castfn(lkb->lkb_astparam);
trace_dlm_ast(ls, lkb, lkb->lkb_lksb);
}
}
......@@ -295,7 +299,8 @@ void dlm_callback_suspend(struct dlm_ls *ls)
void dlm_callback_resume(struct dlm_ls *ls)
{
struct dlm_lkb *lkb, *safe;
int count = 0;
int count = 0, sum = 0;
bool empty;
clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
......@@ -311,14 +316,17 @@ void dlm_callback_resume(struct dlm_ls *ls)
if (count == MAX_CB_QUEUE)
break;
}
empty = list_empty(&ls->ls_cb_delay);
mutex_unlock(&ls->ls_cb_mutex);
if (count)
log_rinfo(ls, "dlm_callback_resume %d", count);
if (count == MAX_CB_QUEUE) {
sum += count;
if (!empty) {
count = 0;
cond_resched();
goto more;
}
if (sum)
log_rinfo(ls, "%s %d", __func__, sum);
}
......@@ -635,6 +635,35 @@ static int table_open2(struct inode *inode, struct file *file)
return 0;
}
static ssize_t table_write2(struct file *file, const char __user *user_buf,
size_t count, loff_t *ppos)
{
struct seq_file *seq = file->private_data;
int n, len, lkb_nodeid, lkb_status, error;
char name[DLM_RESNAME_MAXLEN + 1] = {};
struct dlm_ls *ls = seq->private;
unsigned int lkb_flags;
char buf[256] = {};
uint32_t lkb_id;
if (copy_from_user(buf, user_buf,
min_t(size_t, sizeof(buf) - 1, count)))
return -EFAULT;
n = sscanf(buf, "%x %" __stringify(DLM_RESNAME_MAXLEN) "s %x %d %d",
&lkb_id, name, &lkb_flags, &lkb_nodeid, &lkb_status);
if (n != 5)
return -EINVAL;
len = strnlen(name, DLM_RESNAME_MAXLEN);
error = dlm_debug_add_lkb(ls, lkb_id, name, len, lkb_flags,
lkb_nodeid, lkb_status);
if (error)
return error;
return count;
}
static int table_open3(struct inode *inode, struct file *file)
{
struct seq_file *seq;
......@@ -675,6 +704,7 @@ static const struct file_operations format2_fops = {
.owner = THIS_MODULE,
.open = table_open2,
.read = seq_read,
.write = table_write2,
.llseek = seq_lseek,
.release = seq_release
};
......@@ -724,10 +754,35 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
return rv;
}
static ssize_t waiters_write(struct file *file, const char __user *user_buf,
size_t count, loff_t *ppos)
{
struct dlm_ls *ls = file->private_data;
int mstype, to_nodeid;
char buf[128] = {};
uint32_t lkb_id;
int n, error;
if (copy_from_user(buf, user_buf,
min_t(size_t, sizeof(buf) - 1, count)))
return -EFAULT;
n = sscanf(buf, "%x %d %d", &lkb_id, &mstype, &to_nodeid);
if (n != 3)
return -EINVAL;
error = dlm_debug_add_lkb_to_waiters(ls, lkb_id, mstype, to_nodeid);
if (error)
return error;
return count;
}
static const struct file_operations waiters_fops = {
.owner = THIS_MODULE,
.open = simple_open,
.read = waiters_read,
.write = waiters_write,
.llseek = default_llseek,
};
......@@ -768,6 +823,42 @@ static int dlm_version_show(struct seq_file *file, void *offset)
}
DEFINE_SHOW_ATTRIBUTE(dlm_version);
static ssize_t dlm_rawmsg_write(struct file *fp, const char __user *user_buf,
size_t count, loff_t *ppos)
{
void *buf;
int ret;
if (count > PAGE_SIZE || count < sizeof(struct dlm_header))
return -EINVAL;
buf = kmalloc(PAGE_SIZE, GFP_NOFS);
if (!buf)
return -ENOMEM;
if (copy_from_user(buf, user_buf, count)) {
ret = -EFAULT;
goto out;
}
ret = dlm_midcomms_rawmsg_send(fp->private_data, buf, count);
if (ret)
goto out;
kfree(buf);
return count;
out:
kfree(buf);
return ret;
}
static const struct file_operations dlm_rawmsg_fops = {
.open = simple_open,
.write = dlm_rawmsg_write,
.llseek = no_llseek,
};
void *dlm_create_debug_comms_file(int nodeid, void *data)
{
struct dentry *d_node;
......@@ -782,6 +873,7 @@ void *dlm_create_debug_comms_file(int nodeid, void *data)
debugfs_create_file("send_queue_count", 0444, d_node, data,
&dlm_send_queue_cnt_fops);
debugfs_create_file("version", 0444, d_node, data, &dlm_version_fops);
debugfs_create_file("rawmsg", 0200, d_node, data, &dlm_rawmsg_fops);
return d_node;
}
......@@ -809,7 +901,7 @@ void dlm_create_debug_file(struct dlm_ls *ls)
snprintf(name, DLM_LOCKSPACE_LEN + 8, "%s_locks", ls->ls_name);
ls->ls_debug_locks_dentry = debugfs_create_file(name,
S_IFREG | S_IRUGO,
0644,
dlm_root,
ls,
&format2_fops);
......@@ -840,7 +932,7 @@ void dlm_create_debug_file(struct dlm_ls *ls)
snprintf(name, DLM_LOCKSPACE_LEN + 8, "%s_waiters", ls->ls_name);
ls->ls_debug_waiters_dentry = debugfs_create_file(name,
S_IFREG | S_IRUGO,
0644,
dlm_root,
ls,
&waiters_fops);
......
......@@ -84,8 +84,7 @@ int dlm_recover_directory(struct dlm_ls *ls)
for (;;) {
int left;
error = dlm_recovery_stopped(ls);
if (error) {
if (dlm_recovery_stopped(ls)) {
error = -EINTR;
goto out_free;
}
......
......@@ -41,12 +41,6 @@
#include <linux/dlm.h>
#include "config.h"
/* Size of the temp buffer midcomms allocates on the stack.
We try to make this large enough so most messages fit.
FIXME: should sctp make this unnecessary? */
#define DLM_INBUF_LEN 148
struct dlm_ls;
struct dlm_lkb;
struct dlm_rsb;
......@@ -554,8 +548,9 @@ struct dlm_ls {
uint32_t ls_generation;
uint32_t ls_exflags;
int ls_lvblen;
int ls_count; /* refcount of processes in
atomic_t ls_count; /* refcount of processes in
the dlm using this ls */
wait_queue_head_t ls_count_wait;
int ls_create_count; /* create/release refcount */
unsigned long ls_flags; /* LSFL_ */
unsigned long ls_scan_time;
......@@ -581,6 +576,7 @@ struct dlm_ls {
struct list_head ls_new_rsb; /* new rsb structs */
spinlock_t ls_remove_spin;
wait_queue_head_t ls_remove_wait;
char ls_remove_name[DLM_RESNAME_MAXLEN+1];
char *ls_remove_names[DLM_REMOVE_NAMES_MAX];
int ls_remove_len;
......@@ -632,6 +628,8 @@ struct dlm_ls {
struct rw_semaphore ls_in_recovery; /* block local requests */
struct rw_semaphore ls_recv_active; /* block dlm_recv */
struct list_head ls_requestqueue;/* queue remote requests */
atomic_t ls_requestqueue_cnt;
wait_queue_head_t ls_requestqueue_wait;
struct mutex ls_requestqueue_mutex;
struct dlm_rcom *ls_recover_buf;
int ls_recover_nodeid; /* for debugging */
......
......@@ -53,6 +53,8 @@
R: do_xxxx()
L: receive_xxxx_reply() <- R: send_xxxx_reply()
*/
#include <trace/events/dlm.h>
#include <linux/types.h>
#include <linux/rbtree.h>
#include <linux/slab.h>
......@@ -1178,7 +1180,8 @@ static void detach_lkb(struct dlm_lkb *lkb)
}
}
static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
int start, int end)
{
struct dlm_lkb *lkb;
int rv;
......@@ -1199,7 +1202,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
idr_preload(GFP_NOFS);
spin_lock(&ls->ls_lkbidr_spin);
rv = idr_alloc(&ls->ls_lkbidr, lkb, 1, 0, GFP_NOWAIT);
rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
if (rv >= 0)
lkb->lkb_id = rv;
spin_unlock(&ls->ls_lkbidr_spin);
......@@ -1215,6 +1218,11 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
return 0;
}
static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
{
return _create_lkb(ls, lkb_ret, 1, 0);
}
static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
{
struct dlm_lkb *lkb;
......@@ -1618,21 +1626,24 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
}
/* If there's an rsb for the same resource being removed, ensure
that the remove message is sent before the new lookup message.
It should be rare to need a delay here, but if not, then it may
be worthwhile to add a proper wait mechanism rather than a delay. */
* that the remove message is sent before the new lookup message.
*/
#define DLM_WAIT_PENDING_COND(ls, r) \
(ls->ls_remove_len && \
!rsb_cmp(r, ls->ls_remove_name, \
ls->ls_remove_len))
static void wait_pending_remove(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
restart:
spin_lock(&ls->ls_remove_spin);
if (ls->ls_remove_len &&
!rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
if (DLM_WAIT_PENDING_COND(ls, r)) {
log_debug(ls, "delay lookup for remove dir %d %s",
r->res_dir_nodeid, r->res_name);
r->res_dir_nodeid, r->res_name);
spin_unlock(&ls->ls_remove_spin);
msleep(1);
wait_event(ls->ls_remove_wait, !DLM_WAIT_PENDING_COND(ls, r));
goto restart;
}
spin_unlock(&ls->ls_remove_spin);
......@@ -1784,6 +1795,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
spin_unlock(&ls->ls_remove_spin);
spin_unlock(&ls->ls_rsbtbl[b].lock);
wake_up(&ls->ls_remove_wait);
send_remove(r);
......@@ -3437,6 +3449,8 @@ int dlm_lock(dlm_lockspace_t *lockspace,
if (error)
goto out;
trace_dlm_lock_start(ls, lkb, mode, flags);
error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
astarg, bast, &args);
if (error)
......@@ -3450,6 +3464,8 @@ int dlm_lock(dlm_lockspace_t *lockspace,
if (error == -EINPROGRESS)
error = 0;
out_put:
trace_dlm_lock_end(ls, lkb, mode, flags, error);
if (convert || error)
__put_lkb(ls, lkb);
if (error == -EAGAIN || error == -EDEADLK)
......@@ -3481,6 +3497,8 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
if (error)
goto out;
trace_dlm_unlock_start(ls, lkb, flags);
error = set_unlock_args(flags, astarg, &args);
if (error)
goto out_put;
......@@ -3495,6 +3513,8 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
error = 0;
out_put:
trace_dlm_unlock_end(ls, lkb, flags, error);
dlm_put_lkb(lkb);
out:
dlm_unlock_recovery(ls);
......@@ -3973,6 +3993,14 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
int from = ms->m_header.h_nodeid;
int error = 0;
/* currently mixing of user/kernel locks are not supported */
if (ms->m_flags & DLM_IFL_USER && ~lkb->lkb_flags & DLM_IFL_USER) {
log_error(lkb->lkb_resource->res_ls,
"got user dlm message for a kernel lock");
error = -EINVAL;
goto out;
}
switch (ms->m_type) {
case DLM_MSG_CONVERT:
case DLM_MSG_UNLOCK:
......@@ -4001,6 +4029,7 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
error = -EINVAL;
}
out:
if (error)
log_error(lkb->lkb_resource->res_ls,
"ignore invalid message %d from %d %x %x %x %d",
......@@ -4050,6 +4079,7 @@ static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
spin_unlock(&ls->ls_remove_spin);
spin_unlock(&ls->ls_rsbtbl[b].lock);
wake_up(&ls->ls_remove_wait);
rv = _create_message(ls, sizeof(struct dlm_message) + len,
dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
......@@ -6301,3 +6331,64 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
return error;
}
/* debug functionality */
int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
int lkb_nodeid, unsigned int lkb_flags, int lkb_status)
{
struct dlm_lksb *lksb;
struct dlm_lkb *lkb;
struct dlm_rsb *r;
int error;
/* we currently can't set a valid user lock */
if (lkb_flags & DLM_IFL_USER)
return -EOPNOTSUPP;
lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
if (!lksb)
return -ENOMEM;
error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
if (error) {
kfree(lksb);
return error;
}
lkb->lkb_flags = lkb_flags;
lkb->lkb_nodeid = lkb_nodeid;
lkb->lkb_lksb = lksb;
/* user specific pointer, just don't have it NULL for kernel locks */
if (~lkb_flags & DLM_IFL_USER)
lkb->lkb_astparam = (void *)0xDEADBEEF;
error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
if (error) {
kfree(lksb);
__put_lkb(ls, lkb);
return error;
}
lock_rsb(r);
attach_lkb(r, lkb);
add_lkb(r, lkb, lkb_status);
unlock_rsb(r);
put_rsb(r);
return 0;
}
int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
int mstype, int to_nodeid)
{
struct dlm_lkb *lkb;
int error;
error = find_lkb(ls, lkb_id, &lkb);
if (error)
return error;
error = add_to_waiters(lkb, mstype, to_nodeid);
dlm_put_lkb(lkb);
return error;
}
......@@ -58,6 +58,10 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
int nodeid, int pid);
int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid);
void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc);
int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
int lkb_nodeid, unsigned int lkb_flags, int lkb_status);
int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
int mstype, int to_nodeid);
static inline int is_master(struct dlm_rsb *r)
{
......
......@@ -314,7 +314,7 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
list_for_each_entry(ls, &lslist, ls_list) {
if (ls->ls_global_id == id) {
ls->ls_count++;
atomic_inc(&ls->ls_count);
goto out;
}
}
......@@ -331,7 +331,7 @@ struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
spin_lock(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (ls->ls_local_handle == lockspace) {
ls->ls_count++;
atomic_inc(&ls->ls_count);
goto out;
}
}
......@@ -348,7 +348,7 @@ struct dlm_ls *dlm_find_lockspace_device(int minor)
spin_lock(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (ls->ls_device.minor == minor) {
ls->ls_count++;
atomic_inc(&ls->ls_count);
goto out;
}
}
......@@ -360,24 +360,24 @@ struct dlm_ls *dlm_find_lockspace_device(int minor)
void dlm_put_lockspace(struct dlm_ls *ls)
{
spin_lock(&lslist_lock);
ls->ls_count--;
spin_unlock(&lslist_lock);
if (atomic_dec_and_test(&ls->ls_count))
wake_up(&ls->ls_count_wait);
}
static void remove_lockspace(struct dlm_ls *ls)
{
for (;;) {
spin_lock(&lslist_lock);
if (ls->ls_count == 0) {
WARN_ON(ls->ls_create_count != 0);
list_del(&ls->ls_list);
spin_unlock(&lslist_lock);
return;
}
retry:
wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
spin_lock(&lslist_lock);
if (atomic_read(&ls->ls_count) != 0) {
spin_unlock(&lslist_lock);
ssleep(1);
goto retry;
}
WARN_ON(ls->ls_create_count != 0);
list_del(&ls->ls_list);
spin_unlock(&lslist_lock);
}
static int threads_start(void)
......@@ -481,7 +481,8 @@ static int new_lockspace(const char *name, const char *cluster,
memcpy(ls->ls_name, name, namelen);
ls->ls_namelen = namelen;
ls->ls_lvblen = lvblen;
ls->ls_count = 0;
atomic_set(&ls->ls_count, 0);
init_waitqueue_head(&ls->ls_count_wait);
ls->ls_flags = 0;
ls->ls_scan_time = jiffies;
......@@ -511,6 +512,7 @@ static int new_lockspace(const char *name, const char *cluster,
}
spin_lock_init(&ls->ls_remove_spin);
init_waitqueue_head(&ls->ls_remove_wait);
for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
......@@ -564,6 +566,8 @@ static int new_lockspace(const char *name, const char *cluster,
init_rwsem(&ls->ls_in_recovery);
init_rwsem(&ls->ls_recv_active);
INIT_LIST_HEAD(&ls->ls_requestqueue);
atomic_set(&ls->ls_requestqueue_cnt, 0);
init_waitqueue_head(&ls->ls_requestqueue_wait);
mutex_init(&ls->ls_requestqueue_mutex);
mutex_init(&ls->ls_clear_proc_locks);
......@@ -868,7 +872,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
* until this returns.
*
* Force has 4 possible values:
* 0 - don't destroy locksapce if it has any LKBs
* 0 - don't destroy lockspace if it has any LKBs
* 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
* 2 - destroy lockspace regardless of LKBs
* 3 - destroy lockspace as part of a forced shutdown
......
......@@ -53,9 +53,12 @@
#include <net/sctp/sctp.h>
#include <net/ipv6.h>
#include <trace/events/dlm.h>
#include "dlm_internal.h"
#include "lowcomms.h"
#include "midcomms.h"
#include "memory.h"
#include "config.h"
#define NEEDED_RMEM (4*1024*1024)
......@@ -84,7 +87,6 @@ struct connection {
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
atomic_t writequeue_cnt;
struct mutex wq_alloc;
int retries;
#define MAX_CONNECT_RETRIES 3
struct hlist_node list;
......@@ -189,6 +191,24 @@ static const struct dlm_proto_ops *dlm_proto_ops;
static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work);
static void writequeue_entry_ctor(void *data)
{
struct writequeue_entry *entry = data;
INIT_LIST_HEAD(&entry->msgs);
}
struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
{
return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry),
0, 0, writequeue_entry_ctor);
}
struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
{
return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL);
}
/* need to held writequeue_lock */
static struct writequeue_entry *con_next_wq(struct connection *con)
{
......@@ -199,7 +219,10 @@ static struct writequeue_entry *con_next_wq(struct connection *con)
e = list_first_entry(&con->writequeue, struct writequeue_entry,
list);
if (e->len == 0)
/* if len is zero nothing is to send, if there are users filling
* buffers we wait until the users are done so we can send more.
*/
if (e->users || e->len == 0)
return NULL;
return e;
......@@ -265,8 +288,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
return NULL;
}
mutex_init(&con->wq_alloc);
spin_lock(&connections_lock);
/* Because multiple workqueues/threads calls this function it can
* race on multiple cpu's. Instead of locking hot path __find_con()
......@@ -486,11 +507,9 @@ static void lowcomms_data_ready(struct sock *sk)
{
struct connection *con;
read_lock_bh(&sk->sk_callback_lock);
con = sock2con(sk);
if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
queue_work(recv_workqueue, &con->rwork);
read_unlock_bh(&sk->sk_callback_lock);
}
static void lowcomms_listen_data_ready(struct sock *sk)
......@@ -505,15 +524,14 @@ static void lowcomms_write_space(struct sock *sk)
{
struct connection *con;
read_lock_bh(&sk->sk_callback_lock);
con = sock2con(sk);
if (!con)
goto out;
return;
if (!test_and_set_bit(CF_CONNECTED, &con->flags)) {
log_print("successful connected to node %d", con->nodeid);
queue_work(send_workqueue, &con->swork);
goto out;
return;
}
clear_bit(SOCK_NOSPACE, &con->sock->flags);
......@@ -524,8 +542,6 @@ static void lowcomms_write_space(struct sock *sk)
}
queue_work(send_workqueue, &con->swork);
out:
read_unlock_bh(&sk->sk_callback_lock);
}
static inline void lowcomms_connect_sock(struct connection *con)
......@@ -592,42 +608,41 @@ int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
static void lowcomms_error_report(struct sock *sk)
{
struct connection *con;
struct sockaddr_storage saddr;
void (*orig_report)(struct sock *) = NULL;
struct inet_sock *inet;
read_lock_bh(&sk->sk_callback_lock);
con = sock2con(sk);
if (con == NULL)
goto out;
orig_report = listen_sock.sk_error_report;
if (kernel_getpeername(sk->sk_socket, (struct sockaddr *)&saddr) < 0) {
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
"sending to node %d, port %d, "
"sk_err=%d/%d\n", dlm_our_nodeid(),
con->nodeid, dlm_config.ci_tcp_port,
sk->sk_err, sk->sk_err_soft);
} else if (saddr.ss_family == AF_INET) {
struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr;
inet = inet_sk(sk);
switch (sk->sk_family) {
case AF_INET:
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
"sending to node %d at %pI4, port %d, "
"sending to node %d at %pI4, dport %d, "
"sk_err=%d/%d\n", dlm_our_nodeid(),
con->nodeid, &sin4->sin_addr.s_addr,
dlm_config.ci_tcp_port, sk->sk_err,
con->nodeid, &inet->inet_daddr,
ntohs(inet->inet_dport), sk->sk_err,
sk->sk_err_soft);
} else {
struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr;
break;
#if IS_ENABLED(CONFIG_IPV6)
case AF_INET6:
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
"sending to node %d at %u.%u.%u.%u, "
"port %d, sk_err=%d/%d\n", dlm_our_nodeid(),
con->nodeid, sin6->sin6_addr.s6_addr32[0],
sin6->sin6_addr.s6_addr32[1],
sin6->sin6_addr.s6_addr32[2],
sin6->sin6_addr.s6_addr32[3],
dlm_config.ci_tcp_port, sk->sk_err,
"sending to node %d at %pI6c, "
"dport %d, sk_err=%d/%d\n", dlm_our_nodeid(),
con->nodeid, &sk->sk_v6_daddr,
ntohs(inet->inet_dport), sk->sk_err,
sk->sk_err_soft);
break;
#endif
default:
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
"invalid socket family %d set, "
"sk_err=%d/%d\n", dlm_our_nodeid(),
sk->sk_family, sk->sk_err, sk->sk_err_soft);
goto out;
}
/* below sendcon only handling */
......@@ -646,7 +661,6 @@ static void lowcomms_error_report(struct sock *sk)
queue_work(send_workqueue, &con->swork);
out:
read_unlock_bh(&sk->sk_callback_lock);
if (orig_report)
orig_report(sk);
}
......@@ -666,20 +680,20 @@ static void restore_callbacks(struct socket *sock)
{
struct sock *sk = sock->sk;
write_lock_bh(&sk->sk_callback_lock);
lock_sock(sk);
sk->sk_user_data = NULL;
sk->sk_data_ready = listen_sock.sk_data_ready;
sk->sk_state_change = listen_sock.sk_state_change;
sk->sk_write_space = listen_sock.sk_write_space;
sk->sk_error_report = listen_sock.sk_error_report;
write_unlock_bh(&sk->sk_callback_lock);
release_sock(sk);
}
static void add_listen_sock(struct socket *sock, struct listen_connection *con)
{
struct sock *sk = sock->sk;
write_lock_bh(&sk->sk_callback_lock);
lock_sock(sk);
save_listen_callbacks(sock);
con->sock = sock;
......@@ -687,7 +701,7 @@ static void add_listen_sock(struct socket *sock, struct listen_connection *con)
sk->sk_allocation = GFP_NOFS;
/* Install a data_ready callback */
sk->sk_data_ready = lowcomms_listen_data_ready;
write_unlock_bh(&sk->sk_callback_lock);
release_sock(sk);
}
/* Make a socket active */
......@@ -695,7 +709,7 @@ static void add_sock(struct socket *sock, struct connection *con)
{
struct sock *sk = sock->sk;
write_lock_bh(&sk->sk_callback_lock);
lock_sock(sk);
con->sock = sock;
sk->sk_user_data = con;
......@@ -705,7 +719,7 @@ static void add_sock(struct socket *sock, struct connection *con)
sk->sk_state_change = lowcomms_state_change;
sk->sk_allocation = GFP_NOFS;
sk->sk_error_report = lowcomms_error_report;
write_unlock_bh(&sk->sk_callback_lock);
release_sock(sk);
}
/* Add the port number to an IPv6 or 4 sockaddr and return the address
......@@ -733,7 +747,7 @@ static void dlm_page_release(struct kref *kref)
ref);
__free_page(e->page);
kfree(e);
dlm_free_writequeue(e);
}
static void dlm_msg_release(struct kref *kref)
......@@ -741,7 +755,7 @@ static void dlm_msg_release(struct kref *kref)
struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
kref_put(&msg->entry->ref, dlm_page_release);
kfree(msg);
dlm_free_msg(msg);
}
static void free_entry(struct writequeue_entry *e)
......@@ -925,6 +939,7 @@ static int receive_from_sock(struct connection *con)
msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
msg.msg_flags);
trace_dlm_recv(con->nodeid, ret);
if (ret == -EAGAIN)
break;
else if (ret <= 0)
......@@ -1013,10 +1028,28 @@ static int accept_from_sock(struct listen_connection *con)
/* Get the new node's NODEID */
make_sockaddr(&peeraddr, 0, &len);
if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) {
unsigned char *b=(unsigned char *)&peeraddr;
log_print("connect from non cluster node");
print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
b, sizeof(struct sockaddr_storage));
switch (peeraddr.ss_family) {
case AF_INET: {
struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr;
log_print("connect from non cluster IPv4 node %pI4",
&sin->sin_addr);
break;
}
#if IS_ENABLED(CONFIG_IPV6)
case AF_INET6: {
struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr;
log_print("connect from non cluster IPv6 node %pI6c",
&sin6->sin6_addr);
break;
}
#endif
default:
log_print("invalid family from non cluster node");
break;
}
sock_release(newsock);
return -1;
}
......@@ -1177,33 +1210,33 @@ static void deinit_local(void)
kfree(dlm_local_addr[i]);
}
static struct writequeue_entry *new_writequeue_entry(struct connection *con,
gfp_t allocation)
static struct writequeue_entry *new_writequeue_entry(struct connection *con)
{
struct writequeue_entry *entry;
entry = kzalloc(sizeof(*entry), allocation);
entry = dlm_allocate_writequeue();
if (!entry)
return NULL;
entry->page = alloc_page(allocation | __GFP_ZERO);
entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO);
if (!entry->page) {
kfree(entry);
dlm_free_writequeue(entry);
return NULL;
}
entry->offset = 0;
entry->len = 0;
entry->end = 0;
entry->dirty = false;
entry->con = con;
entry->users = 1;
kref_init(&entry->ref);
INIT_LIST_HEAD(&entry->msgs);
return entry;
}
static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
gfp_t allocation, char **ppc,
void (*cb)(struct dlm_mhandle *mh),
struct dlm_mhandle *mh)
char **ppc, void (*cb)(void *data),
void *data)
{
struct writequeue_entry *e;
......@@ -1215,74 +1248,54 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
*ppc = page_address(e->page) + e->end;
if (cb)
cb(mh);
cb(data);
e->end += len;
e->users++;
spin_unlock(&con->writequeue_lock);
return e;
goto out;
}
}
spin_unlock(&con->writequeue_lock);
e = new_writequeue_entry(con, allocation);
e = new_writequeue_entry(con);
if (!e)
return NULL;
goto out;
kref_get(&e->ref);
*ppc = page_address(e->page);
e->end += len;
atomic_inc(&con->writequeue_cnt);
spin_lock(&con->writequeue_lock);
if (cb)
cb(mh);
cb(data);
list_add_tail(&e->list, &con->writequeue);
spin_unlock(&con->writequeue_lock);
out:
spin_unlock(&con->writequeue_lock);
return e;
};
static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
gfp_t allocation, char **ppc,
void (*cb)(struct dlm_mhandle *mh),
struct dlm_mhandle *mh)
void (*cb)(void *data),
void *data)
{
struct writequeue_entry *e;
struct dlm_msg *msg;
bool sleepable;
msg = kzalloc(sizeof(*msg), allocation);
msg = dlm_allocate_msg(allocation);
if (!msg)
return NULL;
/* this mutex is being used as a wait to avoid multiple "fast"
* new writequeue page list entry allocs in new_wq_entry in
* normal operation which is sleepable context. Without it
* we could end in multiple writequeue entries with one
* dlm message because multiple callers were waiting at
* the writequeue_lock in new_wq_entry().
*/
sleepable = gfpflags_normal_context(allocation);
if (sleepable)
mutex_lock(&con->wq_alloc);
kref_init(&msg->ref);
e = new_wq_entry(con, len, allocation, ppc, cb, mh);
e = new_wq_entry(con, len, ppc, cb, data);
if (!e) {
if (sleepable)
mutex_unlock(&con->wq_alloc);
kfree(msg);
dlm_free_msg(msg);
return NULL;
}
if (sleepable)
mutex_unlock(&con->wq_alloc);
msg->retransmit = false;
msg->orig_msg = NULL;
msg->ppc = *ppc;
msg->len = len;
msg->entry = e;
......@@ -1291,8 +1304,8 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
}
struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
char **ppc, void (*cb)(struct dlm_mhandle *mh),
struct dlm_mhandle *mh)
char **ppc, void (*cb)(void *data),
void *data)
{
struct connection *con;
struct dlm_msg *msg;
......@@ -1313,7 +1326,7 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
return NULL;
}
msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, mh);
msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data);
if (!msg) {
srcu_read_unlock(&connections_srcu, idx);
return NULL;
......@@ -1403,7 +1416,6 @@ static void send_to_sock(struct connection *con)
if (!e)
break;
e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
len = e->len;
offset = e->offset;
BUG_ON(len == 0 && e->users == 0);
......@@ -1411,6 +1423,7 @@ static void send_to_sock(struct connection *con)
ret = kernel_sendpage(con->sock, e->page, offset, len,
msg_flags);
trace_dlm_send(con->nodeid, ret);
if (ret == -EAGAIN || ret == 0) {
if (ret == -EAGAIN &&
test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
......@@ -1680,9 +1693,9 @@ static void _stop_conn(struct connection *con, bool and_other)
set_bit(CF_READ_PENDING, &con->flags);
set_bit(CF_WRITE_PENDING, &con->flags);
if (con->sock && con->sock->sk) {
write_lock_bh(&con->sock->sk->sk_callback_lock);
lock_sock(con->sock->sk);
con->sock->sk->sk_user_data = NULL;
write_unlock_bh(&con->sock->sk->sk_callback_lock);
release_sock(con->sock->sk);
}
if (con->othercon && and_other)
_stop_conn(con->othercon, false);
......@@ -1775,7 +1788,7 @@ static int dlm_listen_for_all(void)
result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
SOCK_STREAM, dlm_proto_ops->proto, &sock);
if (result < 0) {
log_print("Can't create comms socket, check SCTP is loaded");
log_print("Can't create comms socket: %d", result);
goto out;
}
......
......@@ -38,8 +38,8 @@ void dlm_lowcomms_stop(void);
void dlm_lowcomms_exit(void);
int dlm_lowcomms_close(int nodeid);
struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
char **ppc, void (*cb)(struct dlm_mhandle *mh),
struct dlm_mhandle *mh);
char **ppc, void (*cb)(void *data),
void *data);
void dlm_lowcomms_commit_msg(struct dlm_msg *msg);
void dlm_lowcomms_put_msg(struct dlm_msg *msg);
int dlm_lowcomms_resend_msg(struct dlm_msg *msg);
......@@ -47,6 +47,8 @@ int dlm_lowcomms_connect_node(int nodeid);
int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark);
int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
void dlm_midcomms_receive_done(int nodeid);
struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void);
struct kmem_cache *dlm_lowcomms_msg_cache_create(void);
#endif /* __LOWCOMMS_DOT_H__ */
......@@ -19,6 +19,9 @@
#include "config.h"
#include "lowcomms.h"
#define CREATE_TRACE_POINTS
#include <trace/events/dlm.h>
static int __init init_dlm(void)
{
int error;
......
......@@ -442,8 +442,7 @@ static int ping_members(struct dlm_ls *ls)
int error = 0;
list_for_each_entry(memb, &ls->ls_nodes, list) {
error = dlm_recovery_stopped(ls);
if (error) {
if (dlm_recovery_stopped(ls)) {
error = -EINTR;
break;
}
......
......@@ -10,32 +10,61 @@
******************************************************************************/
#include "dlm_internal.h"
#include "midcomms.h"
#include "lowcomms.h"
#include "config.h"
#include "memory.h"
static struct kmem_cache *writequeue_cache;
static struct kmem_cache *mhandle_cache;
static struct kmem_cache *msg_cache;
static struct kmem_cache *lkb_cache;
static struct kmem_cache *rsb_cache;
int __init dlm_memory_init(void)
{
writequeue_cache = dlm_lowcomms_writequeue_cache_create();
if (!writequeue_cache)
goto out;
mhandle_cache = dlm_midcomms_cache_create();
if (!mhandle_cache)
goto mhandle;
lkb_cache = kmem_cache_create("dlm_lkb", sizeof(struct dlm_lkb),
__alignof__(struct dlm_lkb), 0, NULL);
if (!lkb_cache)
return -ENOMEM;
goto lkb;
msg_cache = dlm_lowcomms_msg_cache_create();
if (!msg_cache)
goto msg;
rsb_cache = kmem_cache_create("dlm_rsb", sizeof(struct dlm_rsb),
__alignof__(struct dlm_rsb), 0, NULL);
if (!rsb_cache) {
kmem_cache_destroy(lkb_cache);
return -ENOMEM;
}
if (!rsb_cache)
goto rsb;
return 0;
rsb:
kmem_cache_destroy(msg_cache);
msg:
kmem_cache_destroy(lkb_cache);
lkb:
kmem_cache_destroy(mhandle_cache);
mhandle:
kmem_cache_destroy(writequeue_cache);
out:
return -ENOMEM;
}
void dlm_memory_exit(void)
{
kmem_cache_destroy(writequeue_cache);
kmem_cache_destroy(mhandle_cache);
kmem_cache_destroy(msg_cache);
kmem_cache_destroy(lkb_cache);
kmem_cache_destroy(rsb_cache);
}
......@@ -89,3 +118,32 @@ void dlm_free_lkb(struct dlm_lkb *lkb)
kmem_cache_free(lkb_cache, lkb);
}
struct dlm_mhandle *dlm_allocate_mhandle(void)
{
return kmem_cache_alloc(mhandle_cache, GFP_NOFS);
}
void dlm_free_mhandle(struct dlm_mhandle *mhandle)
{
kmem_cache_free(mhandle_cache, mhandle);
}
struct writequeue_entry *dlm_allocate_writequeue(void)
{
return kmem_cache_alloc(writequeue_cache, GFP_ATOMIC);
}
void dlm_free_writequeue(struct writequeue_entry *writequeue)
{
kmem_cache_free(writequeue_cache, writequeue);
}
struct dlm_msg *dlm_allocate_msg(gfp_t allocation)
{
return kmem_cache_alloc(msg_cache, allocation);
}
void dlm_free_msg(struct dlm_msg *msg)
{
kmem_cache_free(msg_cache, msg);
}
......@@ -20,6 +20,12 @@ struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls);
void dlm_free_lkb(struct dlm_lkb *l);
char *dlm_allocate_lvb(struct dlm_ls *ls);
void dlm_free_lvb(char *l);
struct dlm_mhandle *dlm_allocate_mhandle(void);
void dlm_free_mhandle(struct dlm_mhandle *mhandle);
struct writequeue_entry *dlm_allocate_writequeue(void);
void dlm_free_writequeue(struct writequeue_entry *writequeue);
struct dlm_msg *dlm_allocate_msg(gfp_t allocation);
void dlm_free_msg(struct dlm_msg *msg);
#endif /* __MEMORY_DOT_H__ */
......@@ -137,6 +137,7 @@
#include "dlm_internal.h"
#include "lowcomms.h"
#include "config.h"
#include "memory.h"
#include "lock.h"
#include "util.h"
#include "midcomms.h"
......@@ -220,6 +221,12 @@ DEFINE_STATIC_SRCU(nodes_srcu);
*/
static DEFINE_MUTEX(close_lock);
struct kmem_cache *dlm_midcomms_cache_create(void)
{
return kmem_cache_create("dlm_mhandle", sizeof(struct dlm_mhandle),
0, 0, NULL);
}
static inline const char *dlm_state_str(int state)
{
switch (state) {
......@@ -279,7 +286,7 @@ static void dlm_mhandle_release(struct rcu_head *rcu)
struct dlm_mhandle *mh = container_of(rcu, struct dlm_mhandle, rcu);
dlm_lowcomms_put_msg(mh->msg);
kfree(mh);
dlm_free_mhandle(mh);
}
static void dlm_mhandle_delete(struct midcomms_node *node,
......@@ -909,11 +916,11 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
if (msglen > len)
break;
switch (le32_to_cpu(hd->h_version)) {
case DLM_VERSION_3_1:
switch (hd->h_version) {
case cpu_to_le32(DLM_VERSION_3_1):
dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid);
break;
case DLM_VERSION_3_2:
case cpu_to_le32(DLM_VERSION_3_2):
dlm_midcomms_receive_buffer_3_2((union dlm_packet *)ptr, nodeid);
break;
default:
......@@ -969,7 +976,7 @@ void dlm_midcomms_receive_done(int nodeid)
spin_unlock(&node->state_lock);
/* do nothing FIN has it's own ack send */
break;
};
}
srcu_read_unlock(&nodes_srcu, idx);
}
......@@ -1020,8 +1027,10 @@ static void dlm_fill_opts_header(struct dlm_opts *opts, uint16_t inner_len,
header_out(&opts->o_header);
}
static void midcomms_new_msg_cb(struct dlm_mhandle *mh)
static void midcomms_new_msg_cb(void *data)
{
struct dlm_mhandle *mh = data;
atomic_inc(&mh->node->send_queue_cnt);
spin_lock(&mh->node->send_queue_lock);
......@@ -1071,10 +1080,12 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
/* this is a bug, however we going on and hope it will be resolved */
WARN_ON(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
mh = kzalloc(sizeof(*mh), GFP_NOFS);
mh = dlm_allocate_mhandle();
if (!mh)
goto err;
mh->committed = false;
mh->ack_rcv = NULL;
mh->idx = idx;
mh->node = node;
......@@ -1083,7 +1094,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
msg = dlm_lowcomms_new_msg(nodeid, len, allocation, ppc,
NULL, NULL);
if (!msg) {
kfree(mh);
dlm_free_mhandle(mh);
goto err;
}
......@@ -1092,13 +1103,13 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation,
ppc);
if (!msg) {
kfree(mh);
dlm_free_mhandle(mh);
goto err;
}
break;
default:
kfree(mh);
dlm_free_mhandle(mh);
WARN_ON(1);
goto err;
}
......@@ -1134,7 +1145,7 @@ void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh)
dlm_lowcomms_commit_msg(mh->msg);
dlm_lowcomms_put_msg(mh->msg);
/* mh is not part of rcu list in this case */
kfree(mh);
dlm_free_mhandle(mh);
break;
case DLM_VERSION_3_2:
dlm_midcomms_commit_msg_3_2(mh);
......@@ -1231,7 +1242,7 @@ void dlm_midcomms_add_member(int nodeid)
}
node->users++;
pr_debug("users inc count %d\n", node->users);
pr_debug("node %d users inc count %d\n", nodeid, node->users);
spin_unlock(&node->state_lock);
srcu_read_unlock(&nodes_srcu, idx);
......@@ -1254,7 +1265,7 @@ void dlm_midcomms_remove_member(int nodeid)
spin_lock(&node->state_lock);
node->users--;
pr_debug("users dec count %d\n", node->users);
pr_debug("node %d users dec count %d\n", nodeid, node->users);
/* hitting users count to zero means the
* other side is running dlm_midcomms_stop()
......@@ -1425,3 +1436,51 @@ int dlm_midcomms_close(int nodeid)
return ret;
}
/* debug functionality to send raw dlm msg from user space */
struct dlm_rawmsg_data {
struct midcomms_node *node;
void *buf;
};
static void midcomms_new_rawmsg_cb(void *data)
{
struct dlm_rawmsg_data *rd = data;
struct dlm_header *h = rd->buf;
switch (h->h_version) {
case cpu_to_le32(DLM_VERSION_3_1):
break;
default:
switch (h->h_cmd) {
case DLM_OPTS:
if (!h->u.h_seq)
h->u.h_seq = rd->node->seq_send++;
break;
default:
break;
}
break;
}
}
int dlm_midcomms_rawmsg_send(struct midcomms_node *node, void *buf,
int buflen)
{
struct dlm_rawmsg_data rd;
struct dlm_msg *msg;
char *msgbuf;
rd.node = node;
rd.buf = buf;
msg = dlm_lowcomms_new_msg(node->nodeid, buflen, GFP_NOFS,
&msgbuf, midcomms_new_rawmsg_cb, &rd);
if (!msg)
return -ENOMEM;
memcpy(msgbuf, buf, buflen);
dlm_lowcomms_commit_msg(msg);
return 0;
}
......@@ -28,6 +28,9 @@ const char *dlm_midcomms_state(struct midcomms_node *node);
unsigned long dlm_midcomms_flags(struct midcomms_node *node);
int dlm_midcomms_send_queue_cnt(struct midcomms_node *node);
uint32_t dlm_midcomms_version(struct midcomms_node *node);
int dlm_midcomms_rawmsg_send(struct midcomms_node *node, void *buf,
int buflen);
struct kmem_cache *dlm_midcomms_cache_create(void);
#endif /* __MIDCOMMS_DOT_H__ */
......@@ -601,7 +601,7 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
spin_lock(&ls->ls_recover_lock);
status = ls->ls_recover_status;
stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
stop = dlm_recovery_stopped(ls);
seq = ls->ls_recover_seq;
spin_unlock(&ls->ls_recover_lock);
......
......@@ -124,8 +124,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_recover_waiters_pre(ls);
error = dlm_recovery_stopped(ls);
if (error) {
if (dlm_recovery_stopped(ls)) {
error = -EINTR;
goto fail;
}
......
......@@ -44,6 +44,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
e->nodeid = nodeid;
memcpy(&e->request, ms, ms->m_header.h_length);
atomic_inc(&ls->ls_requestqueue_cnt);
mutex_lock(&ls->ls_requestqueue_mutex);
list_add_tail(&e->list, &ls->ls_requestqueue);
mutex_unlock(&ls->ls_requestqueue_mutex);
......@@ -89,6 +90,8 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
mutex_lock(&ls->ls_requestqueue_mutex);
list_del(&e->list);
if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
wake_up(&ls->ls_requestqueue_wait);
kfree(e);
if (dlm_locking_stopped(ls)) {
......@@ -115,14 +118,8 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
void dlm_wait_requestqueue(struct dlm_ls *ls)
{
for (;;) {
mutex_lock(&ls->ls_requestqueue_mutex);
if (list_empty(&ls->ls_requestqueue))
break;
mutex_unlock(&ls->ls_requestqueue_mutex);
schedule();
}
mutex_unlock(&ls->ls_requestqueue_mutex);
wait_event(ls->ls_requestqueue_wait,
atomic_read(&ls->ls_requestqueue_cnt) == 0);
}
static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
......@@ -130,7 +127,7 @@ static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
uint32_t type = ms->m_type;
/* the ls is being cleaned up and freed by release_lockspace */
if (!ls->ls_count)
if (!atomic_read(&ls->ls_count))
return 1;
if (dlm_is_removed(ls, nodeid))
......@@ -161,6 +158,8 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
if (purge_request(ls, ms, e->nodeid)) {
list_del(&e->list);
if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
wake_up(&ls->ls_requestqueue_wait);
kfree(e);
}
}
......
/* SPDX-License-Identifier: GPL-2.0 */
#undef TRACE_SYSTEM
#define TRACE_SYSTEM dlm
#if !defined(_TRACE_DLM_H) || defined(TRACE_HEADER_MULTI_READ)
#define _TRACE_DLM_H
#include <linux/dlm.h>
#include <linux/dlmconstants.h>
#include <linux/tracepoint.h>
#include "../../../fs/dlm/dlm_internal.h"
#define show_lock_flags(flags) __print_flags(flags, "|", \
{ DLM_LKF_NOQUEUE, "NOQUEUE" }, \
{ DLM_LKF_CANCEL, "CANCEL" }, \
{ DLM_LKF_CONVERT, "CONVERT" }, \
{ DLM_LKF_VALBLK, "VALBLK" }, \
{ DLM_LKF_QUECVT, "QUECVT" }, \
{ DLM_LKF_IVVALBLK, "IVVALBLK" }, \
{ DLM_LKF_CONVDEADLK, "CONVDEADLK" }, \
{ DLM_LKF_PERSISTENT, "PERSISTENT" }, \
{ DLM_LKF_NODLCKWT, "NODLCKWT" }, \
{ DLM_LKF_NODLCKBLK, "NODLCKBLK" }, \
{ DLM_LKF_EXPEDITE, "EXPEDITE" }, \
{ DLM_LKF_NOQUEUEBAST, "NOQUEUEBAST" }, \
{ DLM_LKF_HEADQUE, "HEADQUE" }, \
{ DLM_LKF_NOORDER, "NOORDER" }, \
{ DLM_LKF_ORPHAN, "ORPHAN" }, \
{ DLM_LKF_ALTPR, "ALTPR" }, \
{ DLM_LKF_ALTCW, "ALTCW" }, \
{ DLM_LKF_FORCEUNLOCK, "FORCEUNLOCK" }, \
{ DLM_LKF_TIMEOUT, "TIMEOUT" })
#define show_lock_mode(mode) __print_symbolic(mode, \
{ DLM_LOCK_IV, "IV"}, \
{ DLM_LOCK_NL, "NL"}, \
{ DLM_LOCK_CR, "CR"}, \
{ DLM_LOCK_CW, "CW"}, \
{ DLM_LOCK_PR, "PR"}, \
{ DLM_LOCK_PW, "PW"}, \
{ DLM_LOCK_EX, "EX"})
#define show_dlm_sb_flags(flags) __print_flags(flags, "|", \
{ DLM_SBF_DEMOTED, "DEMOTED" }, \
{ DLM_SBF_VALNOTVALID, "VALNOTVALID" }, \
{ DLM_SBF_ALTMODE, "ALTMODE" })
/* note: we begin tracing dlm_lock_start() only if ls and lkb are found */
TRACE_EVENT(dlm_lock_start,
TP_PROTO(struct dlm_ls *ls, struct dlm_lkb *lkb, int mode,
__u32 flags),
TP_ARGS(ls, lkb, mode, flags),
TP_STRUCT__entry(
__field(__u32, ls_id)
__field(__u32, lkb_id)
__field(int, mode)
__field(__u32, flags)
),
TP_fast_assign(
__entry->ls_id = ls->ls_global_id;
__entry->lkb_id = lkb->lkb_id;
__entry->mode = mode;
__entry->flags = flags;
),
TP_printk("ls_id=%u lkb_id=%x mode=%s flags=%s",
__entry->ls_id, __entry->lkb_id,
show_lock_mode(__entry->mode),
show_lock_flags(__entry->flags))
);
TRACE_EVENT(dlm_lock_end,
TP_PROTO(struct dlm_ls *ls, struct dlm_lkb *lkb, int mode, __u32 flags,
int error),
TP_ARGS(ls, lkb, mode, flags, error),
TP_STRUCT__entry(
__field(__u32, ls_id)
__field(__u32, lkb_id)
__field(int, mode)
__field(__u32, flags)
__field(int, error)
),
TP_fast_assign(
__entry->ls_id = ls->ls_global_id;
__entry->lkb_id = lkb->lkb_id;
__entry->mode = mode;
__entry->flags = flags;
/* return value will be zeroed in those cases by dlm_lock()
* we do it here again to not introduce more overhead if
* trace isn't running and error reflects the return value.
*/
if (error == -EAGAIN || error == -EDEADLK)
__entry->error = 0;
else
__entry->error = error;
),
TP_printk("ls_id=%u lkb_id=%x mode=%s flags=%s error=%d",
__entry->ls_id, __entry->lkb_id,
show_lock_mode(__entry->mode),
show_lock_flags(__entry->flags), __entry->error)
);
TRACE_EVENT(dlm_bast,
TP_PROTO(struct dlm_ls *ls, struct dlm_lkb *lkb, int mode),
TP_ARGS(ls, lkb, mode),
TP_STRUCT__entry(
__field(__u32, ls_id)
__field(__u32, lkb_id)
__field(int, mode)
),
TP_fast_assign(
__entry->ls_id = ls->ls_global_id;
__entry->lkb_id = lkb->lkb_id;
__entry->mode = mode;
),
TP_printk("ls_id=%u lkb_id=%x mode=%s", __entry->ls_id,
__entry->lkb_id, show_lock_mode(__entry->mode))
);
TRACE_EVENT(dlm_ast,
TP_PROTO(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_lksb *lksb),
TP_ARGS(ls, lkb, lksb),
TP_STRUCT__entry(
__field(__u32, ls_id)
__field(__u32, lkb_id)
__field(u8, sb_flags)
__field(int, sb_status)
),
TP_fast_assign(
__entry->ls_id = ls->ls_global_id;
__entry->lkb_id = lkb->lkb_id;
__entry->sb_flags = lksb->sb_flags;
__entry->sb_status = lksb->sb_status;
),
TP_printk("ls_id=%u lkb_id=%x sb_flags=%s sb_status=%d",
__entry->ls_id, __entry->lkb_id,
show_dlm_sb_flags(__entry->sb_flags), __entry->sb_status)
);
/* note: we begin tracing dlm_unlock_start() only if ls and lkb are found */
TRACE_EVENT(dlm_unlock_start,
TP_PROTO(struct dlm_ls *ls, struct dlm_lkb *lkb, __u32 flags),
TP_ARGS(ls, lkb, flags),
TP_STRUCT__entry(
__field(__u32, ls_id)
__field(__u32, lkb_id)
__field(__u32, flags)
),
TP_fast_assign(
__entry->ls_id = ls->ls_global_id;
__entry->lkb_id = lkb->lkb_id;
__entry->flags = flags;
),
TP_printk("ls_id=%u lkb_id=%x flags=%s",
__entry->ls_id, __entry->lkb_id,
show_lock_flags(__entry->flags))
);
TRACE_EVENT(dlm_unlock_end,
TP_PROTO(struct dlm_ls *ls, struct dlm_lkb *lkb, __u32 flags,
int error),
TP_ARGS(ls, lkb, flags, error),
TP_STRUCT__entry(
__field(__u32, ls_id)
__field(__u32, lkb_id)
__field(__u32, flags)
__field(int, error)
),
TP_fast_assign(
__entry->ls_id = ls->ls_global_id;
__entry->lkb_id = lkb->lkb_id;
__entry->flags = flags;
__entry->error = error;
),
TP_printk("ls_id=%u lkb_id=%x flags=%s error=%d",
__entry->ls_id, __entry->lkb_id,
show_lock_flags(__entry->flags), __entry->error)
);
TRACE_EVENT(dlm_send,
TP_PROTO(int nodeid, int ret),
TP_ARGS(nodeid, ret),
TP_STRUCT__entry(
__field(int, nodeid)
__field(int, ret)
),
TP_fast_assign(
__entry->nodeid = nodeid;
__entry->ret = ret;
),
TP_printk("nodeid=%d ret=%d", __entry->nodeid, __entry->ret)
);
TRACE_EVENT(dlm_recv,
TP_PROTO(int nodeid, int ret),
TP_ARGS(nodeid, ret),
TP_STRUCT__entry(
__field(int, nodeid)
__field(int, ret)
),
TP_fast_assign(
__entry->nodeid = nodeid;
__entry->ret = ret;
),
TP_printk("nodeid=%d ret=%d", __entry->nodeid, __entry->ret)
);
#endif /* if !defined(_TRACE_DLM_H) || defined(TRACE_HEADER_MULTI_READ) */
/* This part must be outside protection */
#include <trace/define_trace.h>
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment