Commit fbfaf03e authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'dlm-6.4' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:

 - Remove some unused features (related to lock timeouts) that have been
   previously scheduled for removal

 - Fix a bug where the pending callback flag would be incorrectly
   cleared, which could potentially result in missing a completion
   callback

 - Use an unbound workqueue for dlm socket handling so that socket
   operations can be processed with less delay

 - Fix possible lockspace join connection errors with large clusters
   (e.g. over 16 nodes) caused by a small socket backlog setting

 - Use atomic bit ops for internal flags to help avoid mistakes copying
   flag values from messages

 - Fix recently introduced bug where memory for lvb data could be
   unnecessarily allocated for a lock

* tag 'dlm-6.4' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  fs: dlm: stop unnecessarily filling zero ms_extra bytes
  fs: dlm: switch lkb_sbflags to atomic ops
  fs: dlm: rsb hash table flag value to atomic ops
  fs: dlm: move internal flags to atomic ops
  fs: dlm: change dflags to use atomic bits
  fs: dlm: store lkb distributed flags into own value
  fs: dlm: remove DLM_IFL_LOCAL_MS flag
  fs: dlm: rename stub to local message flag
  fs: dlm: remove deprecated code parts
  DLM: increase socket backlog to avoid hangs with 16 nodes
  fs: dlm: add unbound flag to dlm_io workqueue
  fs: dlm: fix DLM_IFL_CB_PENDING gets overwritten
parents e0fcc9c6 7a40f1f1
......@@ -8,15 +8,6 @@ menuconfig DLM
A general purpose distributed lock manager for kernel or userspace
applications.
config DLM_DEPRECATED_API
bool "DLM deprecated API"
depends on DLM
help
Enables deprecated DLM timeout features that will be removed in
later Linux kernel releases.
If you are unsure, say N.
config DLM_DEBUG
bool "DLM debugging"
depends on DLM
......
......@@ -17,6 +17,5 @@ dlm-y := ast.o \
requestqueue.o \
user.o \
util.o
dlm-$(CONFIG_DLM_DEPRECATED_API) += netlink.o
dlm-$(CONFIG_DLM_DEBUG) += debug_fs.o
......@@ -45,7 +45,7 @@ void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb)
kref_put(&cb->ref, dlm_release_callback);
}
lkb->lkb_flags &= ~DLM_IFL_CB_PENDING;
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
/* invalidate */
dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
......@@ -103,10 +103,9 @@ int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
cb->sb_status = status;
cb->sb_flags = (sbflags & 0x000000FF);
kref_init(&cb->ref);
if (!(lkb->lkb_flags & DLM_IFL_CB_PENDING)) {
lkb->lkb_flags |= DLM_IFL_CB_PENDING;
if (!test_and_set_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags))
rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
}
list_add_tail(&cb->list, &lkb->lkb_callbacks);
if (flags & DLM_CB_CAST)
......@@ -140,7 +139,7 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int rv;
if (lkb->lkb_flags & DLM_IFL_USER) {
if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
dlm_user_add_ast(lkb, flags, mode, status, sbflags);
return;
}
......@@ -209,7 +208,7 @@ void dlm_callback_work(struct work_struct *work)
spin_lock(&lkb->lkb_cb_lock);
rv = dlm_dequeue_lkb_callback(lkb, &cb);
if (rv == DLM_DEQUEUE_CALLBACK_EMPTY) {
lkb->lkb_flags &= ~DLM_IFL_CB_PENDING;
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
spin_unlock(&lkb->lkb_cb_lock);
break;
}
......
......@@ -75,9 +75,6 @@ struct dlm_cluster {
unsigned int cl_log_info;
unsigned int cl_protocol;
unsigned int cl_mark;
#ifdef CONFIG_DLM_DEPRECATED_API
unsigned int cl_timewarn_cs;
#endif
unsigned int cl_new_rsb_count;
unsigned int cl_recover_callbacks;
char cl_cluster_name[DLM_LOCKSPACE_LEN];
......@@ -103,9 +100,6 @@ enum {
CLUSTER_ATTR_LOG_INFO,
CLUSTER_ATTR_PROTOCOL,
CLUSTER_ATTR_MARK,
#ifdef CONFIG_DLM_DEPRECATED_API
CLUSTER_ATTR_TIMEWARN_CS,
#endif
CLUSTER_ATTR_NEW_RSB_COUNT,
CLUSTER_ATTR_RECOVER_CALLBACKS,
CLUSTER_ATTR_CLUSTER_NAME,
......@@ -226,9 +220,6 @@ CLUSTER_ATTR(log_debug, NULL);
CLUSTER_ATTR(log_info, NULL);
CLUSTER_ATTR(protocol, dlm_check_protocol_and_dlm_running);
CLUSTER_ATTR(mark, NULL);
#ifdef CONFIG_DLM_DEPRECATED_API
CLUSTER_ATTR(timewarn_cs, dlm_check_zero);
#endif
CLUSTER_ATTR(new_rsb_count, NULL);
CLUSTER_ATTR(recover_callbacks, NULL);
......@@ -243,9 +234,6 @@ static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_LOG_INFO] = &cluster_attr_log_info,
[CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol,
[CLUSTER_ATTR_MARK] = &cluster_attr_mark,
#ifdef CONFIG_DLM_DEPRECATED_API
[CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs,
#endif
[CLUSTER_ATTR_NEW_RSB_COUNT] = &cluster_attr_new_rsb_count,
[CLUSTER_ATTR_RECOVER_CALLBACKS] = &cluster_attr_recover_callbacks,
[CLUSTER_ATTR_CLUSTER_NAME] = &cluster_attr_cluster_name,
......@@ -436,9 +424,6 @@ static struct config_group *make_cluster(struct config_group *g,
cl->cl_log_debug = dlm_config.ci_log_debug;
cl->cl_log_info = dlm_config.ci_log_info;
cl->cl_protocol = dlm_config.ci_protocol;
#ifdef CONFIG_DLM_DEPRECATED_API
cl->cl_timewarn_cs = dlm_config.ci_timewarn_cs;
#endif
cl->cl_new_rsb_count = dlm_config.ci_new_rsb_count;
cl->cl_recover_callbacks = dlm_config.ci_recover_callbacks;
memcpy(cl->cl_cluster_name, dlm_config.ci_cluster_name,
......@@ -959,9 +944,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
#define DEFAULT_LOG_INFO 1
#define DEFAULT_PROTOCOL DLM_PROTO_TCP
#define DEFAULT_MARK 0
#ifdef CONFIG_DLM_DEPRECATED_API
#define DEFAULT_TIMEWARN_CS 500 /* 5 sec = 500 centiseconds */
#endif
#define DEFAULT_NEW_RSB_COUNT 128
#define DEFAULT_RECOVER_CALLBACKS 0
#define DEFAULT_CLUSTER_NAME ""
......@@ -977,9 +959,6 @@ struct dlm_config_info dlm_config = {
.ci_log_info = DEFAULT_LOG_INFO,
.ci_protocol = DEFAULT_PROTOCOL,
.ci_mark = DEFAULT_MARK,
#ifdef CONFIG_DLM_DEPRECATED_API
.ci_timewarn_cs = DEFAULT_TIMEWARN_CS,
#endif
.ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT,
.ci_recover_callbacks = DEFAULT_RECOVER_CALLBACKS,
.ci_cluster_name = DEFAULT_CLUSTER_NAME
......
......@@ -37,9 +37,6 @@ struct dlm_config_info {
int ci_log_info;
int ci_protocol;
int ci_mark;
#ifdef CONFIG_DLM_DEPRECATED_API
int ci_timewarn_cs;
#endif
int ci_new_rsb_count;
int ci_recover_callbacks;
char ci_cluster_name[DLM_LOCKSPACE_LEN];
......
......@@ -170,7 +170,7 @@ static void print_format2_lock(struct seq_file *s, struct dlm_lkb *lkb,
u64 xid = 0;
u64 us;
if (lkb->lkb_flags & DLM_IFL_USER) {
if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
if (lkb->lkb_ua)
xid = lkb->lkb_ua->xid;
}
......@@ -188,7 +188,7 @@ static void print_format2_lock(struct seq_file *s, struct dlm_lkb *lkb,
lkb->lkb_ownpid,
(unsigned long long)xid,
lkb->lkb_exflags,
lkb->lkb_flags,
dlm_iflags_val(lkb),
lkb->lkb_status,
lkb->lkb_grmode,
lkb->lkb_rqmode,
......@@ -230,7 +230,7 @@ static void print_format3_lock(struct seq_file *s, struct dlm_lkb *lkb,
{
u64 xid = 0;
if (lkb->lkb_flags & DLM_IFL_USER) {
if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
if (lkb->lkb_ua)
xid = lkb->lkb_ua->xid;
}
......@@ -242,7 +242,7 @@ static void print_format3_lock(struct seq_file *s, struct dlm_lkb *lkb,
lkb->lkb_ownpid,
(unsigned long long)xid,
lkb->lkb_exflags,
lkb->lkb_flags,
dlm_iflags_val(lkb),
lkb->lkb_status,
lkb->lkb_grmode,
lkb->lkb_rqmode,
......
......@@ -99,13 +99,13 @@ do { \
}
#define DLM_RTF_SHRINK 0x00000001
#define DLM_RTF_SHRINK_BIT 0
struct dlm_rsbtable {
struct rb_root keep;
struct rb_root toss;
spinlock_t lock;
uint32_t flags;
unsigned long flags;
};
......@@ -145,9 +145,6 @@ struct dlm_args {
void (*bastfn) (void *astparam, int mode);
int mode;
struct dlm_lksb *lksb;
#ifdef CONFIG_DLM_DEPRECATED_API
unsigned long timeout;
#endif
};
......@@ -197,31 +194,25 @@ struct dlm_args {
#define DLM_LKSTS_GRANTED 2
#define DLM_LKSTS_CONVERT 3
/* lkb_flags */
#define DLM_IFL_MSTCPY 0x00010000
#define DLM_IFL_RESEND 0x00020000
#define DLM_IFL_DEAD 0x00040000
#define DLM_IFL_OVERLAP_UNLOCK 0x00080000
#define DLM_IFL_OVERLAP_CANCEL 0x00100000
#define DLM_IFL_ENDOFLIFE 0x00200000
#ifdef CONFIG_DLM_DEPRECATED_API
#define DLM_IFL_WATCH_TIMEWARN 0x00400000
#define DLM_IFL_TIMEOUT_CANCEL 0x00800000
#endif
#define DLM_IFL_DEADLOCK_CANCEL 0x01000000
#define DLM_IFL_STUB_MS 0x02000000 /* magic number for m_flags */
#define DLM_IFL_CB_PENDING 0x04000000
/* least significant 2 bytes are message changed, they are full transmitted
* but at receive side only the 2 bytes LSB will be set.
*
* Even wireshark dlm dissector does only evaluate the lower bytes and note
* that they may not be used on transceiver side, we assume the higher bytes
* are for internal use or reserved so long they are not parsed on receiver
* side.
*/
#define DLM_IFL_USER 0x00000001
#define DLM_IFL_ORPHAN 0x00000002
/* lkb_iflags */
#define DLM_IFL_MSTCPY_BIT 16
#define __DLM_IFL_MIN_BIT DLM_IFL_MSTCPY_BIT
#define DLM_IFL_RESEND_BIT 17
#define DLM_IFL_DEAD_BIT 18
#define DLM_IFL_OVERLAP_UNLOCK_BIT 19
#define DLM_IFL_OVERLAP_CANCEL_BIT 20
#define DLM_IFL_ENDOFLIFE_BIT 21
#define DLM_IFL_DEADLOCK_CANCEL_BIT 24
#define DLM_IFL_CB_PENDING_BIT 25
#define __DLM_IFL_MAX_BIT DLM_IFL_CB_PENDING_BIT
/* lkb_dflags */
#define DLM_DFL_USER_BIT 0
#define __DLM_DFL_MIN_BIT DLM_DFL_USER_BIT
#define DLM_DFL_ORPHAN_BIT 1
#define __DLM_DFL_MAX_BIT DLM_DFL_ORPHAN_BIT
#define DLM_CB_CAST 0x00000001
#define DLM_CB_BAST 0x00000002
......@@ -244,8 +235,9 @@ struct dlm_lkb {
uint32_t lkb_id; /* our lock ID */
uint32_t lkb_remid; /* lock ID on remote partner */
uint32_t lkb_exflags; /* external flags from caller */
uint32_t lkb_sbflags; /* lksb flags */
uint32_t lkb_flags; /* internal flags */
unsigned long lkb_sbflags; /* lksb flags */
unsigned long lkb_dflags; /* distributed flags */
unsigned long lkb_iflags; /* internal flags */
uint32_t lkb_lvbseq; /* lvb sequence number */
int8_t lkb_status; /* granted, waiting, convert */
......@@ -263,11 +255,6 @@ struct dlm_lkb {
struct list_head lkb_ownqueue; /* list of locks for a process */
ktime_t lkb_timestamp;
#ifdef CONFIG_DLM_DEPRECATED_API
struct list_head lkb_time_list;
unsigned long lkb_timeout_cs;
#endif
spinlock_t lkb_cb_lock;
struct work_struct lkb_cb_work;
struct list_head lkb_cb_list; /* for ls_cb_delay or proc->asts */
......@@ -583,11 +570,6 @@ struct dlm_ls {
struct mutex ls_orphans_mutex;
struct list_head ls_orphans;
#ifdef CONFIG_DLM_DEPRECATED_API
struct mutex ls_timeout_mutex;
struct list_head ls_timeout;
#endif
spinlock_t ls_new_rsb_spin;
int ls_new_rsb_count;
struct list_head ls_new_rsb; /* new rsb structs */
......@@ -607,9 +589,9 @@ struct dlm_ls {
int ls_slots_size;
struct dlm_slot *ls_slots;
struct dlm_rsb ls_stub_rsb; /* for returning errors */
struct dlm_lkb ls_stub_lkb; /* for returning errors */
struct dlm_message ls_stub_ms; /* for faking a reply */
struct dlm_rsb ls_local_rsb; /* for returning errors */
struct dlm_lkb ls_local_lkb; /* for returning errors */
struct dlm_message ls_local_ms; /* for faking a reply */
struct dentry *ls_debug_rsb_dentry; /* debugfs */
struct dentry *ls_debug_waiters_dentry; /* debugfs */
......@@ -701,9 +683,6 @@ struct dlm_ls {
#define LSFL_RCOM_READY 5
#define LSFL_RCOM_WAIT 6
#define LSFL_UEVENT_WAIT 7
#ifdef CONFIG_DLM_DEPRECATED_API
#define LSFL_TIMEWARN 8
#endif
#define LSFL_CB_DELAY 9
#define LSFL_NODIR 10
......@@ -756,15 +735,76 @@ static inline int dlm_no_directory(struct dlm_ls *ls)
return test_bit(LSFL_NODIR, &ls->ls_flags);
}
#ifdef CONFIG_DLM_DEPRECATED_API
int dlm_netlink_init(void);
void dlm_netlink_exit(void);
void dlm_timeout_warn(struct dlm_lkb *lkb);
#else
static inline int dlm_netlink_init(void) { return 0; }
static inline void dlm_netlink_exit(void) { };
static inline void dlm_timeout_warn(struct dlm_lkb *lkb) { };
#endif
/* takes a snapshot from dlm atomic flags */
static inline uint32_t dlm_flags_val(const unsigned long *addr,
uint32_t min, uint32_t max)
{
uint32_t bit = min, val = 0;
for_each_set_bit_from(bit, addr, max + 1) {
val |= BIT(bit);
}
return val;
}
static inline uint32_t dlm_iflags_val(const struct dlm_lkb *lkb)
{
return dlm_flags_val(&lkb->lkb_iflags, __DLM_IFL_MIN_BIT,
__DLM_IFL_MAX_BIT);
}
static inline uint32_t dlm_dflags_val(const struct dlm_lkb *lkb)
{
return dlm_flags_val(&lkb->lkb_dflags, __DLM_DFL_MIN_BIT,
__DLM_DFL_MAX_BIT);
}
/* coming from UAPI header
*
* TODO:
* Move this to UAPI header and let other values point to them and use BIT()
*/
#define DLM_SBF_DEMOTED_BIT 0
#define __DLM_SBF_MIN_BIT DLM_SBF_DEMOTED_BIT
#define DLM_SBF_VALNOTVALID_BIT 1
#define DLM_SBF_ALTMODE_BIT 2
#define __DLM_SBF_MAX_BIT DLM_SBF_ALTMODE_BIT
static inline uint32_t dlm_sbflags_val(const struct dlm_lkb *lkb)
{
/* be sure the next person updates this */
BUILD_BUG_ON(BIT(__DLM_SBF_MAX_BIT) != DLM_SBF_ALTMODE);
return dlm_flags_val(&lkb->lkb_sbflags, __DLM_SBF_MIN_BIT,
__DLM_SBF_MAX_BIT);
}
static inline void dlm_set_flags_val(unsigned long *addr, uint32_t val,
uint32_t min, uint32_t max)
{
uint32_t bit;
for (bit = min; bit < (max + 1); bit++) {
if (val & BIT(bit))
set_bit(bit, addr);
else
clear_bit(bit, addr);
}
}
static inline void dlm_set_dflags_val(struct dlm_lkb *lkb, uint32_t val)
{
dlm_set_flags_val(&lkb->lkb_dflags, val, __DLM_DFL_MIN_BIT,
__DLM_DFL_MAX_BIT);
}
static inline void dlm_set_sbflags_val(struct dlm_lkb *lkb, uint32_t val)
{
dlm_set_flags_val(&lkb->lkb_sbflags, val, __DLM_SBF_MIN_BIT,
__DLM_SBF_MAX_BIT);
}
int dlm_plock_init(void);
void dlm_plock_exit(void);
......
......@@ -86,10 +86,9 @@ static int send_remove(struct dlm_rsb *r);
static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
struct dlm_message *ms);
struct dlm_message *ms, bool local);
static int receive_extralen(struct dlm_message *ms);
static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
static void del_timeout(struct dlm_lkb *lkb);
static void toss_rsb(struct kref *kref);
/*
......@@ -164,7 +163,7 @@ void dlm_print_lkb(struct dlm_lkb *lkb)
printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
"sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
(unsigned long long)lkb->lkb_recover_seq);
}
......@@ -229,12 +228,12 @@ static inline int force_blocking_asts(struct dlm_lkb *lkb)
static inline int is_demoted(struct dlm_lkb *lkb)
{
return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
}
static inline int is_altmode(struct dlm_lkb *lkb)
{
return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
}
static inline int is_granted(struct dlm_lkb *lkb)
......@@ -250,12 +249,13 @@ static inline int is_remote(struct dlm_rsb *r)
static inline int is_process_copy(struct dlm_lkb *lkb)
{
return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
return lkb->lkb_nodeid &&
!test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
}
static inline int is_master_copy(struct dlm_lkb *lkb)
{
return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
}
static inline int middle_conversion(struct dlm_lkb *lkb)
......@@ -273,18 +273,18 @@ static inline int down_conversion(struct dlm_lkb *lkb)
static inline int is_overlap_unlock(struct dlm_lkb *lkb)
{
return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
}
static inline int is_overlap_cancel(struct dlm_lkb *lkb)
{
return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
}
static inline int is_overlap(struct dlm_lkb *lkb)
{
return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
DLM_IFL_OVERLAP_CANCEL));
return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
}
static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
......@@ -292,25 +292,13 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
if (is_master_copy(lkb))
return;
del_timeout(lkb);
DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
#ifdef CONFIG_DLM_DEPRECATED_API
/* if the operation was a cancel, then return -DLM_ECANCEL, if a
timeout caused the cancel then return -ETIMEDOUT */
if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
rv = -ETIMEDOUT;
}
#endif
if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
if (rv == -DLM_ECANCEL &&
test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
rv = -EDEADLK;
}
dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
}
static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
......@@ -1151,7 +1139,7 @@ static void toss_rsb(struct kref *kref)
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
r->res_toss_time = jiffies;
ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK;
set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
if (r->res_lvbptr) {
dlm_free_lvb(r->res_lvbptr);
r->res_lvbptr = NULL;
......@@ -1215,9 +1203,6 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
kref_init(&lkb->lkb_ref);
INIT_LIST_HEAD(&lkb->lkb_ownqueue);
INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
#ifdef CONFIG_DLM_DEPRECATED_API
INIT_LIST_HEAD(&lkb->lkb_time_list);
#endif
INIT_LIST_HEAD(&lkb->lkb_cb_list);
INIT_LIST_HEAD(&lkb->lkb_callbacks);
spin_lock_init(&lkb->lkb_cb_lock);
......@@ -1434,10 +1419,10 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
switch (mstype) {
case DLM_MSG_UNLOCK:
lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
break;
case DLM_MSG_CANCEL:
lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
break;
default:
error = -EBUSY;
......@@ -1448,7 +1433,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
lkb->lkb_id, lkb->lkb_wait_type, mstype,
lkb->lkb_wait_count, lkb->lkb_flags);
lkb->lkb_wait_count, dlm_iflags_val(lkb));
goto out;
}
......@@ -1464,7 +1449,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
out:
if (error)
log_error(ls, "addwait error %x %d flags %x %d %d %s",
lkb->lkb_id, error, lkb->lkb_flags, mstype,
lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
lkb->lkb_wait_type, lkb->lkb_resource->res_name);
mutex_unlock(&ls->ls_waiters_mutex);
return error;
......@@ -1481,16 +1466,16 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int overlap_done = 0;
if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
if (mstype == DLM_MSG_UNLOCK_REPLY &&
test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
overlap_done = 1;
goto out_del;
}
if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
if (mstype == DLM_MSG_CANCEL_REPLY &&
test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
overlap_done = 1;
goto out_del;
}
......@@ -1514,12 +1499,11 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
lingering state of the cancel and fail with -EBUSY. */
if ((mstype == DLM_MSG_CONVERT_REPLY) &&
(lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
is_overlap_cancel(lkb) && ms && !ms->m_result) {
(lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
lkb->lkb_id);
lkb->lkb_wait_type = 0;
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
lkb->lkb_wait_count--;
unhold_lkb(lkb);
goto out_del;
......@@ -1535,7 +1519,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
lkb->lkb_remid, mstype, lkb->lkb_flags);
lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
return -1;
out_del:
......@@ -1554,7 +1538,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
lkb->lkb_flags &= ~DLM_IFL_RESEND;
clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
lkb->lkb_wait_count--;
if (!lkb->lkb_wait_count)
list_del_init(&lkb->lkb_wait_reply);
......@@ -1573,18 +1557,19 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
return error;
}
/* Handles situations where we might be processing a "fake" or "stub" reply in
/* Handles situations where we might be processing a "fake" or "local" reply in
which we can't try to take waiters_mutex again. */
static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms,
bool local)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error;
if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS))
if (!local)
mutex_lock(&ls->ls_waiters_mutex);
error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS))
if (!local)
mutex_unlock(&ls->ls_waiters_mutex);
return error;
}
......@@ -1603,7 +1588,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
spin_lock(&ls->ls_rsbtbl[b].lock);
if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) {
if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
return;
}
......@@ -1658,9 +1643,9 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
}
if (need_shrink)
ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK;
set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
else
ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK;
clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
spin_unlock(&ls->ls_rsbtbl[b].lock);
/*
......@@ -1735,133 +1720,6 @@ void dlm_scan_rsbs(struct dlm_ls *ls)
}
}
#ifdef CONFIG_DLM_DEPRECATED_API
static void add_timeout(struct dlm_lkb *lkb)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
if (is_master_copy(lkb))
return;
if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
goto add_it;
}
if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
goto add_it;
return;
add_it:
DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
mutex_lock(&ls->ls_timeout_mutex);
hold_lkb(lkb);
list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
mutex_unlock(&ls->ls_timeout_mutex);
}
static void del_timeout(struct dlm_lkb *lkb)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
mutex_lock(&ls->ls_timeout_mutex);
if (!list_empty(&lkb->lkb_time_list)) {
list_del_init(&lkb->lkb_time_list);
unhold_lkb(lkb);
}
mutex_unlock(&ls->ls_timeout_mutex);
}
/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
and then lock rsb because of lock ordering in add_timeout. We may need
to specify some special timeout-related bits in the lkb that are just to
be accessed under the timeout_mutex. */
void dlm_scan_timeout(struct dlm_ls *ls)
{
struct dlm_rsb *r;
struct dlm_lkb *lkb = NULL, *iter;
int do_cancel, do_warn;
s64 wait_us;
for (;;) {
if (dlm_locking_stopped(ls))
break;
do_cancel = 0;
do_warn = 0;
mutex_lock(&ls->ls_timeout_mutex);
list_for_each_entry(iter, &ls->ls_timeout, lkb_time_list) {
wait_us = ktime_to_us(ktime_sub(ktime_get(),
iter->lkb_timestamp));
if ((iter->lkb_exflags & DLM_LKF_TIMEOUT) &&
wait_us >= (iter->lkb_timeout_cs * 10000))
do_cancel = 1;
if ((iter->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
wait_us >= dlm_config.ci_timewarn_cs * 10000)
do_warn = 1;
if (!do_cancel && !do_warn)
continue;
hold_lkb(iter);
lkb = iter;
break;
}
mutex_unlock(&ls->ls_timeout_mutex);
if (!lkb)
break;
r = lkb->lkb_resource;
hold_rsb(r);
lock_rsb(r);
if (do_warn) {
/* clear flag so we only warn once */
lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
del_timeout(lkb);
dlm_timeout_warn(lkb);
}
if (do_cancel) {
log_debug(ls, "timeout cancel %x node %d %s",
lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
del_timeout(lkb);
_cancel_lock(r, lkb);
}
unlock_rsb(r);
unhold_rsb(r);
dlm_put_lkb(lkb);
}
}
/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
dlm_recoverd before checking/setting ls_recover_begin. */
void dlm_adjust_timeouts(struct dlm_ls *ls)
{
struct dlm_lkb *lkb;
u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
ls->ls_recover_begin = 0;
mutex_lock(&ls->ls_timeout_mutex);
list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
mutex_unlock(&ls->ls_timeout_mutex);
}
#else
static void add_timeout(struct dlm_lkb *lkb) { }
static void del_timeout(struct dlm_lkb *lkb) { }
#endif
/* lkb is master or local copy */
static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
......@@ -1912,7 +1770,7 @@ static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
}
if (rsb_flag(r, RSB_VALNOTVALID))
lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
}
static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
......@@ -2384,7 +2242,7 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
conversion_deadlock_detect(r, lkb)) {
if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
lkb->lkb_grmode = DLM_LOCK_NL;
lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
} else if (err) {
*err = -EDEADLK;
} else {
......@@ -2411,7 +2269,7 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
lkb->lkb_rqmode = alt;
rv = _can_be_granted(r, lkb, now, 0);
if (rv)
lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
else
lkb->lkb_rqmode = rqmode;
}
......@@ -2723,20 +2581,11 @@ static void confirm_master(struct dlm_rsb *r, int error)
}
}
#ifdef CONFIG_DLM_DEPRECATED_API
static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
int namelen, unsigned long timeout_cs,
void (*ast) (void *astparam),
void *astparam,
void (*bast) (void *astparam, int mode),
struct dlm_args *args)
#else
static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
int namelen, void (*ast)(void *astparam),
void *astparam,
void (*bast)(void *astparam, int mode),
struct dlm_args *args)
#endif
{
int rv = -EINVAL;
......@@ -2789,9 +2638,6 @@ static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
args->astfn = ast;
args->astparam = astparam;
args->bastfn = bast;
#ifdef CONFIG_DLM_DEPRECATED_API
args->timeout = timeout_cs;
#endif
args->mode = mode;
args->lksb = lksb;
rv = 0;
......@@ -2830,7 +2676,7 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
goto out;
rv = -EINVAL;
if (lkb->lkb_flags & DLM_IFL_MSTCPY)
if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
goto out;
if (args->flags & DLM_LKF_QUECVT &&
......@@ -2839,7 +2685,7 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
}
lkb->lkb_exflags = args->flags;
lkb->lkb_sbflags = 0;
dlm_set_sbflags_val(lkb, 0);
lkb->lkb_astfn = args->astfn;
lkb->lkb_astparam = args->astparam;
lkb->lkb_bastfn = args->bastfn;
......@@ -2847,9 +2693,6 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
lkb->lkb_lksb = args->lksb;
lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
lkb->lkb_ownpid = (int) current->pid;
#ifdef CONFIG_DLM_DEPRECATED_API
lkb->lkb_timeout_cs = args->timeout;
#endif
rv = 0;
out:
switch (rv) {
......@@ -2859,13 +2702,13 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
/* annoy the user because dlm usage is wrong */
WARN_ON(1);
log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
lkb->lkb_status, lkb->lkb_wait_type,
lkb->lkb_resource->res_name);
break;
default:
log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
lkb->lkb_status, lkb->lkb_wait_type,
lkb->lkb_resource->res_name);
break;
......@@ -2908,7 +2751,7 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
}
rv = -EINVAL;
if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
dlm_print_lkb(lkb);
goto out;
......@@ -2919,7 +2762,7 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
* locks; return same error as if the lkid had not been found at all
*/
if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
rv = -ENOENT;
goto out;
......@@ -2934,11 +2777,8 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
if (is_overlap(lkb))
goto out;
/* don't let scand try to do a cancel */
del_timeout(lkb);
if (lkb->lkb_flags & DLM_IFL_RESEND) {
lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
rv = -EBUSY;
goto out;
}
......@@ -2953,7 +2793,7 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
switch (lkb->lkb_wait_type) {
case DLM_MSG_LOOKUP:
case DLM_MSG_REQUEST:
lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
rv = -EBUSY;
goto out;
case DLM_MSG_UNLOCK:
......@@ -2975,11 +2815,8 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
if (is_overlap_unlock(lkb))
goto out;
/* don't let scand try to do a cancel */
del_timeout(lkb);
if (lkb->lkb_flags & DLM_IFL_RESEND) {
lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
rv = -EBUSY;
goto out;
}
......@@ -2987,7 +2824,7 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
switch (lkb->lkb_wait_type) {
case DLM_MSG_LOOKUP:
case DLM_MSG_REQUEST:
lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
rv = -EBUSY;
goto out;
case DLM_MSG_UNLOCK:
......@@ -2999,7 +2836,7 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
out_ok:
/* an overlapping op shouldn't blow away exflags from other op */
lkb->lkb_exflags |= args->flags;
lkb->lkb_sbflags = 0;
dlm_set_sbflags_val(lkb, 0);
lkb->lkb_astparam = args->astparam;
rv = 0;
out:
......@@ -3010,13 +2847,13 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
/* annoy the user because dlm usage is wrong */
WARN_ON(1);
log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
args->flags, lkb->lkb_wait_type,
lkb->lkb_resource->res_name);
break;
default:
log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
args->flags, lkb->lkb_wait_type,
lkb->lkb_resource->res_name);
break;
......@@ -3045,7 +2882,6 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
if (can_be_queued(lkb)) {
error = -EINPROGRESS;
add_lkb(r, lkb, DLM_LKSTS_WAITING);
add_timeout(lkb);
goto out;
}
......@@ -3114,7 +2950,6 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
error = -EINPROGRESS;
del_lkb(r, lkb);
add_lkb(r, lkb, DLM_LKSTS_CONVERT);
add_timeout(lkb);
goto out;
}
......@@ -3401,13 +3236,8 @@ int dlm_lock(dlm_lockspace_t *lockspace,
trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
#ifdef CONFIG_DLM_DEPRECATED_API
error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
astarg, bast, &args);
#else
error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
&args);
#endif
if (error)
goto out_put;
......@@ -3551,7 +3381,7 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
case DLM_MSG_REQUEST_REPLY:
case DLM_MSG_CONVERT_REPLY:
case DLM_MSG_GRANT:
if (lkb && lkb->lkb_lvbptr)
if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
mb_len += r->res_ls->ls_lvblen;
break;
}
......@@ -3578,8 +3408,8 @@ static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
ms->m_lkid = cpu_to_le32(lkb->lkb_id);
ms->m_remid = cpu_to_le32(lkb->lkb_remid);
ms->m_exflags = cpu_to_le32(lkb->lkb_exflags);
ms->m_sbflags = cpu_to_le32(lkb->lkb_sbflags);
ms->m_flags = cpu_to_le32(lkb->lkb_flags);
ms->m_sbflags = cpu_to_le32(dlm_sbflags_val(lkb));
ms->m_flags = cpu_to_le32(dlm_dflags_val(lkb));
ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
ms->m_status = cpu_to_le32(lkb->lkb_status);
ms->m_grmode = cpu_to_le32(lkb->lkb_grmode);
......@@ -3656,10 +3486,9 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
/* down conversions go without a reply from the master */
if (!error && down_conversion(lkb)) {
remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
r->res_ls->ls_stub_ms.m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
r->res_ls->ls_stub_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
r->res_ls->ls_stub_ms.m_result = 0;
__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
r->res_ls->ls_local_ms.m_result = 0;
__receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
}
return error;
......@@ -3818,7 +3647,7 @@ static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
int ret_nodeid, int rv)
{
struct dlm_rsb *r = &ls->ls_stub_rsb;
struct dlm_rsb *r = &ls->ls_local_rsb;
struct dlm_message *ms;
struct dlm_mhandle *mh;
int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
......@@ -3844,19 +3673,18 @@ static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
{
lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags);
lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
(le32_to_cpu(ms->m_flags) & 0x0000FFFF);
dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
}
static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
bool local)
{
if (ms->m_flags == cpu_to_le32(DLM_IFL_STUB_MS))
if (local)
return;
lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags);
lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
(le32_to_cpu(ms->m_flags) & 0x0000FFFF);
dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
}
static int receive_extralen(struct dlm_message *ms)
......@@ -3938,12 +3766,12 @@ static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
return 0;
}
/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
/* We fill in the local-lkb fields with the info that send_xxxx_reply()
uses to send a reply and that the remote end uses to process the reply. */
static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
static void setup_local_lkb(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb = &ls->ls_stub_lkb;
struct dlm_lkb *lkb = &ls->ls_local_lkb;
lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
}
......@@ -3957,8 +3785,8 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
int error = 0;
/* currently mixing of user/kernel locks are not supported */
if (ms->m_flags & cpu_to_le32(DLM_IFL_USER) &&
~lkb->lkb_flags & DLM_IFL_USER) {
if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
!test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
log_error(lkb->lkb_resource->res_ls,
"got user dlm message for a kernel lock");
error = -EINVAL;
......@@ -3998,7 +3826,8 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
log_error(lkb->lkb_resource->res_ls,
"ignore invalid message %d from %d %x %x %x %d",
le32_to_cpu(ms->m_type), from, lkb->lkb_id,
lkb->lkb_remid, lkb->lkb_flags, lkb->lkb_nodeid);
lkb->lkb_remid, dlm_iflags_val(lkb),
lkb->lkb_nodeid);
return error;
}
......@@ -4016,7 +3845,7 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
goto fail;
receive_flags(lkb, ms);
lkb->lkb_flags |= DLM_IFL_MSTCPY;
set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
error = receive_request_args(ls, lkb, ms);
if (error) {
__put_lkb(ls, lkb);
......@@ -4076,8 +3905,8 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
le32_to_cpu(ms->m_lkid), from_nodeid, error);
}
setup_stub_lkb(ls, ms);
send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
setup_local_lkb(ls, ms);
send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
return error;
}
......@@ -4132,8 +3961,8 @@ static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
return 0;
fail:
setup_stub_lkb(ls, ms);
send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
setup_local_lkb(ls, ms);
send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
return error;
}
......@@ -4184,8 +4013,8 @@ static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
return 0;
fail:
setup_stub_lkb(ls, ms);
send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
setup_local_lkb(ls, ms);
send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
return error;
}
......@@ -4220,8 +4049,8 @@ static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
return 0;
fail:
setup_stub_lkb(ls, ms);
send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
setup_local_lkb(ls, ms);
send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
return error;
}
......@@ -4244,7 +4073,7 @@ static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
if (error)
goto out;
receive_flags_reply(lkb, ms);
receive_flags_reply(lkb, ms, false);
if (is_altmode(lkb))
munge_altmode(lkb, ms);
grant_lock_pc(r, lkb, ms);
......@@ -4448,13 +4277,12 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
case -EINPROGRESS:
case 0:
/* request was queued or granted on remote master */
receive_flags_reply(lkb, ms);
receive_flags_reply(lkb, ms, false);
lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
if (is_altmode(lkb))
munge_altmode(lkb, ms);
if (result) {
add_lkb(r, lkb, DLM_LKSTS_WAITING);
add_timeout(lkb);
} else {
grant_lock_pc(r, lkb, ms);
queue_cast(r, lkb, 0);
......@@ -4496,20 +4324,21 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
lkb->lkb_id, result);
}
if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
if ((result == 0 || result == -EINPROGRESS) &&
test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
log_debug(ls, "receive_request_reply %x result %d unlock",
lkb->lkb_id, result);
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
send_unlock(r, lkb);
} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
} else if ((result == -EINPROGRESS) &&
test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
&lkb->lkb_iflags)) {
log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
send_cancel(r, lkb);
} else {
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
}
out:
unlock_rsb(r);
......@@ -4519,7 +4348,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
}
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
struct dlm_message *ms)
struct dlm_message *ms, bool local)
{
/* this is the value returned from do_convert() on the master */
switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
......@@ -4529,24 +4358,23 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
break;
case -EDEADLK:
receive_flags_reply(lkb, ms);
receive_flags_reply(lkb, ms, local);
revert_lock_pc(r, lkb);
queue_cast(r, lkb, -EDEADLK);
break;
case -EINPROGRESS:
/* convert was queued on remote master */
receive_flags_reply(lkb, ms);
receive_flags_reply(lkb, ms, local);
if (is_demoted(lkb))
munge_demoted(lkb);
del_lkb(r, lkb);
add_lkb(r, lkb, DLM_LKSTS_CONVERT);
add_timeout(lkb);
break;
case 0:
/* convert was granted on remote master */
receive_flags_reply(lkb, ms);
receive_flags_reply(lkb, ms, local);
if (is_demoted(lkb))
munge_demoted(lkb);
grant_lock_pc(r, lkb, ms);
......@@ -4563,7 +4391,8 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
}
}
static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
bool local)
{
struct dlm_rsb *r = lkb->lkb_resource;
int error;
......@@ -4575,12 +4404,12 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
if (error)
goto out;
/* stub reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms);
/* local reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms, local);
if (error)
goto out;
__receive_convert_reply(r, lkb, ms);
__receive_convert_reply(r, lkb, ms, local);
out:
unlock_rsb(r);
put_rsb(r);
......@@ -4595,12 +4424,13 @@ static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
if (error)
return error;
_receive_convert_reply(lkb, ms);
_receive_convert_reply(lkb, ms, false);
dlm_put_lkb(lkb);
return 0;
}
static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
bool local)
{
struct dlm_rsb *r = lkb->lkb_resource;
int error;
......@@ -4612,8 +4442,8 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
if (error)
goto out;
/* stub reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms);
/* local reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms, local);
if (error)
goto out;
......@@ -4621,7 +4451,7 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
case -DLM_EUNLOCK:
receive_flags_reply(lkb, ms);
receive_flags_reply(lkb, ms, local);
remove_lock_pc(r, lkb);
queue_cast(r, lkb, -DLM_EUNLOCK);
break;
......@@ -4645,12 +4475,13 @@ static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
if (error)
return error;
_receive_unlock_reply(lkb, ms);
_receive_unlock_reply(lkb, ms, false);
dlm_put_lkb(lkb);
return 0;
}
static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
bool local)
{
struct dlm_rsb *r = lkb->lkb_resource;
int error;
......@@ -4662,8 +4493,8 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
if (error)
goto out;
/* stub reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms);
/* local reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms, local);
if (error)
goto out;
......@@ -4671,7 +4502,7 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
case -DLM_ECANCEL:
receive_flags_reply(lkb, ms);
receive_flags_reply(lkb, ms, local);
revert_lock_pc(r, lkb);
queue_cast(r, lkb, -DLM_ECANCEL);
break;
......@@ -4696,7 +4527,7 @@ static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
if (error)
return error;
_receive_cancel_reply(lkb, ms);
_receive_cancel_reply(lkb, ms, false);
dlm_put_lkb(lkb);
return 0;
}
......@@ -4763,7 +4594,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
if (is_overlap(lkb)) {
log_debug(ls, "receive_lookup_reply %x unlock %x",
lkb->lkb_id, lkb->lkb_flags);
lkb->lkb_id, dlm_iflags_val(lkb));
queue_cast_overlap(r, lkb);
unhold_lkb(lkb); /* undoes create_lkb() */
goto out_list;
......@@ -5006,16 +4837,15 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
}
static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_message *ms_stub)
struct dlm_message *ms_local)
{
if (middle_conversion(lkb)) {
hold_lkb(lkb);
memset(ms_stub, 0, sizeof(struct dlm_message));
ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
ms_stub->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
ms_stub->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
_receive_convert_reply(lkb, ms_stub);
memset(ms_local, 0, sizeof(struct dlm_message));
ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
_receive_convert_reply(lkb, ms_local, true);
/* Same special case as in receive_rcom_lock_args() */
lkb->lkb_grmode = DLM_LOCK_IV;
......@@ -5023,7 +4853,7 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
unhold_lkb(lkb);
} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
lkb->lkb_flags |= DLM_IFL_RESEND;
set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
}
/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
......@@ -5054,12 +4884,12 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
void dlm_recover_waiters_pre(struct dlm_ls *ls)
{
struct dlm_lkb *lkb, *safe;
struct dlm_message *ms_stub;
int wait_type, stub_unlock_result, stub_cancel_result;
struct dlm_message *ms_local;
int wait_type, local_unlock_result, local_cancel_result;
int dir_nodeid;
ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL);
if (!ms_stub)
ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
if (!ms_local)
return;
mutex_lock(&ls->ls_waiters_mutex);
......@@ -5087,7 +4917,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
resent after recovery is done */
if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
lkb->lkb_flags |= DLM_IFL_RESEND;
set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
continue;
}
......@@ -5095,8 +4925,8 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
continue;
wait_type = lkb->lkb_wait_type;
stub_unlock_result = -DLM_EUNLOCK;
stub_cancel_result = -DLM_ECANCEL;
local_unlock_result = -DLM_EUNLOCK;
local_cancel_result = -DLM_ECANCEL;
/* Main reply may have been received leaving a zero wait_type,
but a reply for the overlapping op may not have been
......@@ -5107,48 +4937,46 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
if (is_overlap_cancel(lkb)) {
wait_type = DLM_MSG_CANCEL;
if (lkb->lkb_grmode == DLM_LOCK_IV)
stub_cancel_result = 0;
local_cancel_result = 0;
}
if (is_overlap_unlock(lkb)) {
wait_type = DLM_MSG_UNLOCK;
if (lkb->lkb_grmode == DLM_LOCK_IV)
stub_unlock_result = -ENOENT;
local_unlock_result = -ENOENT;
}
log_debug(ls, "rwpre overlap %x %x %d %d %d",
lkb->lkb_id, lkb->lkb_flags, wait_type,
stub_cancel_result, stub_unlock_result);
lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
local_cancel_result, local_unlock_result);
}
switch (wait_type) {
case DLM_MSG_REQUEST:
lkb->lkb_flags |= DLM_IFL_RESEND;
set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
break;
case DLM_MSG_CONVERT:
recover_convert_waiter(ls, lkb, ms_stub);
recover_convert_waiter(ls, lkb, ms_local);
break;
case DLM_MSG_UNLOCK:
hold_lkb(lkb);
memset(ms_stub, 0, sizeof(struct dlm_message));
ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
ms_stub->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_unlock_result));
ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
_receive_unlock_reply(lkb, ms_stub);
memset(ms_local, 0, sizeof(struct dlm_message));
ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
_receive_unlock_reply(lkb, ms_local, true);
dlm_put_lkb(lkb);
break;
case DLM_MSG_CANCEL:
hold_lkb(lkb);
memset(ms_stub, 0, sizeof(struct dlm_message));
ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
ms_stub->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_cancel_result));
ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
_receive_cancel_reply(lkb, ms_stub);
memset(ms_local, 0, sizeof(struct dlm_message));
ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
_receive_cancel_reply(lkb, ms_local, true);
dlm_put_lkb(lkb);
break;
......@@ -5159,7 +4987,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
schedule();
}
mutex_unlock(&ls->ls_waiters_mutex);
kfree(ms_stub);
kfree(ms_local);
}
static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
......@@ -5168,7 +4996,7 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
mutex_lock(&ls->ls_waiters_mutex);
list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
if (iter->lkb_flags & DLM_IFL_RESEND) {
if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
hold_lkb(iter);
lkb = iter;
break;
......@@ -5217,8 +5045,10 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
lock_rsb(r);
mstype = lkb->lkb_wait_type;
oc = is_overlap_cancel(lkb);
ou = is_overlap_unlock(lkb);
oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
&lkb->lkb_iflags);
ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
&lkb->lkb_iflags);
err = 0;
log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
......@@ -5231,9 +5061,7 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
previous op or overlap op on this lock. First, do a big
remove_from_waiters() for all previous ops. */
lkb->lkb_flags &= ~DLM_IFL_RESEND;
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
lkb->lkb_wait_type = 0;
/* drop all wait_count references we still
* hold a reference for this iteration.
......@@ -5518,8 +5346,8 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
lkb->lkb_flags |= DLM_IFL_MSTCPY;
dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
lkb->lkb_rqmode = rl->rl_rqmode;
lkb->lkb_grmode = rl->rl_grmode;
......@@ -5708,14 +5536,8 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
return 0;
}
#ifdef CONFIG_DLM_DEPRECATED_API
int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
int mode, uint32_t flags, void *name, unsigned int namelen,
unsigned long timeout_cs)
#else
int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
int mode, uint32_t flags, void *name, unsigned int namelen)
#endif
{
struct dlm_lkb *lkb;
struct dlm_args args;
......@@ -5740,13 +5562,8 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
goto out_put;
}
}
#ifdef CONFIG_DLM_DEPRECATED_API
error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
fake_astfn, ua, fake_bastfn, &args);
#else
error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
fake_bastfn, &args);
#endif
if (error) {
kfree(ua->lksb.sb_lvbptr);
ua->lksb.sb_lvbptr = NULL;
......@@ -5755,9 +5572,9 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
}
/* After ua is attached to lkb it will be freed by dlm_free_lkb().
When DLM_IFL_USER is set, the dlm knows that this is a userspace
When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
lock and that lkb_astparam is the dlm_user_args structure. */
lkb->lkb_flags |= DLM_IFL_USER;
set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
error = request_lock(ls, lkb, name, namelen, &args);
switch (error) {
......@@ -5788,14 +5605,8 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
return error;
}
#ifdef CONFIG_DLM_DEPRECATED_API
int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
unsigned long timeout_cs)
#else
int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
#endif
{
struct dlm_lkb *lkb;
struct dlm_args args;
......@@ -5832,13 +5643,8 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
ua->bastaddr = ua_tmp->bastaddr;
ua->user_lksb = ua_tmp->user_lksb;
#ifdef CONFIG_DLM_DEPRECATED_API
error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
fake_astfn, ua, fake_bastfn, &args);
#else
error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
fake_bastfn, &args);
#endif
if (error)
goto out_put;
......@@ -5883,7 +5689,7 @@ int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
lkb = iter;
list_del_init(&iter->lkb_ownqueue);
iter->lkb_flags &= ~DLM_IFL_ORPHAN;
clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
*lkid = iter->lkb_id;
break;
}
......@@ -6050,7 +5856,7 @@ int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
error = validate_unlock_args(lkb, &args);
if (error)
goto out_r;
lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
error = _cancel_lock(r, lkb);
out_r:
......@@ -6127,9 +5933,9 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
list_del_init(&lkb->lkb_ownqueue);
if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
lkb->lkb_flags |= DLM_IFL_ORPHAN;
set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
else
lkb->lkb_flags |= DLM_IFL_DEAD;
set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
out:
spin_unlock(&ls->ls_clear_proc_locks);
return lkb;
......@@ -6155,7 +5961,6 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
lkb = del_proc_lock(ls, proc);
if (!lkb)
break;
del_timeout(lkb);
if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
orphan_proc_lock(ls, lkb);
else
......@@ -6173,7 +5978,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
/* in-progress unlocks */
list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
list_del_init(&lkb->lkb_ownqueue);
lkb->lkb_flags |= DLM_IFL_DEAD;
set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
dlm_put_lkb(lkb);
}
......@@ -6204,7 +6009,7 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
if (!lkb)
break;
lkb->lkb_flags |= DLM_IFL_DEAD;
set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
unlock_proc_lock(ls, lkb);
dlm_put_lkb(lkb); /* ref from proc->locks list */
}
......@@ -6212,7 +6017,7 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
spin_lock(&proc->locks_spin);
list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
list_del_init(&lkb->lkb_ownqueue);
lkb->lkb_flags |= DLM_IFL_DEAD;
set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
dlm_put_lkb(lkb);
}
spin_unlock(&proc->locks_spin);
......@@ -6279,7 +6084,7 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
/* debug functionality */
int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
int lkb_nodeid, unsigned int lkb_flags, int lkb_status)
int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
{
struct dlm_lksb *lksb;
struct dlm_lkb *lkb;
......@@ -6287,7 +6092,7 @@ int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
int error;
/* we currently can't set a valid user lock */
if (lkb_flags & DLM_IFL_USER)
if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
return -EOPNOTSUPP;
lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
......@@ -6300,11 +6105,11 @@ int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
return error;
}
lkb->lkb_flags = lkb_flags;
dlm_set_dflags_val(lkb, lkb_dflags);
lkb->lkb_nodeid = lkb_nodeid;
lkb->lkb_lksb = lksb;
/* user specific pointer, just don't have it NULL for kernel locks */
if (~lkb_flags & DLM_IFL_USER)
if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
lkb->lkb_astparam = (void *)0xDEADBEEF;
error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
......
......@@ -25,14 +25,6 @@ void dlm_scan_rsbs(struct dlm_ls *ls);
int dlm_lock_recovery_try(struct dlm_ls *ls);
void dlm_unlock_recovery(struct dlm_ls *ls);
#ifdef CONFIG_DLM_DEPRECATED_API
void dlm_scan_timeout(struct dlm_ls *ls);
void dlm_adjust_timeouts(struct dlm_ls *ls);
#else
static inline void dlm_scan_timeout(struct dlm_ls *ls) { }
static inline void dlm_adjust_timeouts(struct dlm_ls *ls) { }
#endif
int dlm_master_lookup(struct dlm_ls *ls, int nodeid, char *name, int len,
unsigned int flags, int *r_nodeid, int *result);
......@@ -47,19 +39,10 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls);
int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
#ifdef CONFIG_DLM_DEPRECATED_API
int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, int mode,
uint32_t flags, void *name, unsigned int namelen,
unsigned long timeout_cs);
int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
unsigned long timeout_cs);
#else
int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, int mode,
uint32_t flags, void *name, unsigned int namelen);
int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
int mode, uint32_t flags, uint32_t lkid, char *lvb_in);
#endif
int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
int mode, uint32_t flags, void *name, unsigned int namelen,
uint32_t *lkid);
......
......@@ -273,7 +273,6 @@ static int dlm_scand(void *data)
if (dlm_lock_recovery_try(ls)) {
ls->ls_scan_time = jiffies;
dlm_scan_rsbs(ls);
dlm_scan_timeout(ls);
dlm_unlock_recovery(ls);
} else {
ls->ls_scan_time += HZ;
......@@ -488,28 +487,10 @@ static int new_lockspace(const char *name, const char *cluster,
ls->ls_ops_arg = ops_arg;
}
#ifdef CONFIG_DLM_DEPRECATED_API
if (flags & DLM_LSFL_TIMEWARN) {
pr_warn_once("===============================================================\n"
"WARNING: the dlm DLM_LSFL_TIMEWARN flag is being deprecated and\n"
" will be removed in v6.2!\n"
" Inclusive DLM_LSFL_TIMEWARN define in UAPI header!\n"
"===============================================================\n");
set_bit(LSFL_TIMEWARN, &ls->ls_flags);
}
/* ls_exflags are forced to match among nodes, and we don't
* need to require all nodes to have some flags set
*/
ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
DLM_LSFL_NEWEXCL));
#else
/* ls_exflags are forced to match among nodes, and we don't
* need to require all nodes to have some flags set
*/
ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
#endif
size = READ_ONCE(dlm_config.ci_rsbtbl_size);
ls->ls_rsbtbl_size = size;
......@@ -537,10 +518,6 @@ static int new_lockspace(const char *name, const char *cluster,
mutex_init(&ls->ls_waiters_mutex);
INIT_LIST_HEAD(&ls->ls_orphans);
mutex_init(&ls->ls_orphans_mutex);
#ifdef CONFIG_DLM_DEPRECATED_API
INIT_LIST_HEAD(&ls->ls_timeout);
mutex_init(&ls->ls_timeout_mutex);
#endif
INIT_LIST_HEAD(&ls->ls_new_rsb);
spin_lock_init(&ls->ls_new_rsb_spin);
......@@ -552,8 +529,8 @@ static int new_lockspace(const char *name, const char *cluster,
ls->ls_total_weight = 0;
ls->ls_node_array = NULL;
memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
ls->ls_stub_rsb.res_ls = ls;
memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
ls->ls_local_rsb.res_ls = ls;
ls->ls_debug_rsb_dentry = NULL;
ls->ls_debug_waiters_dentry = NULL;
......@@ -764,7 +741,7 @@ static int lkb_idr_free(int id, void *p, void *data)
{
struct dlm_lkb *lkb = p;
if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
dlm_free_lvb(lkb->lkb_lvbptr);
dlm_free_lkb(lkb);
......
......@@ -1717,8 +1717,8 @@ static void work_stop(void)
static int work_start(void)
{
io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM,
0);
io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM |
WQ_UNBOUND, 0);
if (!io_workqueue) {
log_print("can't start dlm_io");
return -ENOMEM;
......@@ -1814,7 +1814,7 @@ static int dlm_listen_for_all(void)
sock->sk->sk_data_ready = lowcomms_listen_data_ready;
release_sock(sock->sk);
result = sock->ops->listen(sock, 5);
result = sock->ops->listen(sock, 128);
if (result < 0) {
dlm_close_sock(&listen_con.sock);
return result;
......
......@@ -46,20 +46,14 @@ static int __init init_dlm(void)
if (error)
goto out_debug;
error = dlm_netlink_init();
if (error)
goto out_user;
error = dlm_plock_init();
if (error)
goto out_netlink;
goto out_user;
printk("DLM installed\n");
return 0;
out_netlink:
dlm_netlink_exit();
out_user:
dlm_user_exit();
out_debug:
......@@ -77,7 +71,6 @@ static int __init init_dlm(void)
static void __exit exit_dlm(void)
{
dlm_plock_exit();
dlm_netlink_exit();
dlm_user_exit();
dlm_config_exit();
dlm_memory_exit();
......
......@@ -118,7 +118,7 @@ struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls)
void dlm_free_lkb(struct dlm_lkb *lkb)
{
if (lkb->lkb_flags & DLM_IFL_USER) {
if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
struct dlm_user_args *ua;
ua = lkb->lkb_ua;
if (ua) {
......
// SPDX-License-Identifier: GPL-2.0-only
/*
* Copyright (C) 2007 Red Hat, Inc. All rights reserved.
*/
#include <net/genetlink.h>
#include <linux/dlm.h>
#include <linux/dlm_netlink.h>
#include <linux/gfp.h>
#include "dlm_internal.h"
static uint32_t dlm_nl_seqnum;
static uint32_t listener_nlportid;
static struct genl_family family;
static int prepare_data(u8 cmd, struct sk_buff **skbp, size_t size)
{
struct sk_buff *skb;
void *data;
skb = genlmsg_new(size, GFP_NOFS);
if (!skb)
return -ENOMEM;
/* add the message headers */
data = genlmsg_put(skb, 0, dlm_nl_seqnum++, &family, 0, cmd);
if (!data) {
nlmsg_free(skb);
return -EINVAL;
}
*skbp = skb;
return 0;
}
static struct dlm_lock_data *mk_data(struct sk_buff *skb)
{
struct nlattr *ret;
ret = nla_reserve(skb, DLM_TYPE_LOCK, sizeof(struct dlm_lock_data));
if (!ret)
return NULL;
return nla_data(ret);
}
static int send_data(struct sk_buff *skb)
{
struct genlmsghdr *genlhdr = nlmsg_data((struct nlmsghdr *)skb->data);
void *data = genlmsg_data(genlhdr);
genlmsg_end(skb, data);
return genlmsg_unicast(&init_net, skb, listener_nlportid);
}
static int user_cmd(struct sk_buff *skb, struct genl_info *info)
{
listener_nlportid = info->snd_portid;
printk("user_cmd nlpid %u\n", listener_nlportid);
return 0;
}
static const struct genl_small_ops dlm_nl_ops[] = {
{
.cmd = DLM_CMD_HELLO,
.validate = GENL_DONT_VALIDATE_STRICT | GENL_DONT_VALIDATE_DUMP,
.doit = user_cmd,
},
};
static struct genl_family family __ro_after_init = {
.name = DLM_GENL_NAME,
.version = DLM_GENL_VERSION,
.small_ops = dlm_nl_ops,
.n_small_ops = ARRAY_SIZE(dlm_nl_ops),
.resv_start_op = DLM_CMD_HELLO + 1,
.module = THIS_MODULE,
};
int __init dlm_netlink_init(void)
{
return genl_register_family(&family);
}
void dlm_netlink_exit(void)
{
genl_unregister_family(&family);
}
static void fill_data(struct dlm_lock_data *data, struct dlm_lkb *lkb)
{
struct dlm_rsb *r = lkb->lkb_resource;
memset(data, 0, sizeof(struct dlm_lock_data));
data->version = DLM_LOCK_DATA_VERSION;
data->nodeid = lkb->lkb_nodeid;
data->ownpid = lkb->lkb_ownpid;
data->id = lkb->lkb_id;
data->remid = lkb->lkb_remid;
data->status = lkb->lkb_status;
data->grmode = lkb->lkb_grmode;
data->rqmode = lkb->lkb_rqmode;
if (lkb->lkb_ua)
data->xid = lkb->lkb_ua->xid;
if (r) {
data->lockspace_id = r->res_ls->ls_global_id;
data->resource_namelen = r->res_length;
memcpy(data->resource_name, r->res_name, r->res_length);
}
}
void dlm_timeout_warn(struct dlm_lkb *lkb)
{
struct sk_buff *send_skb;
struct dlm_lock_data *data;
size_t size;
int rv;
size = nla_total_size(sizeof(struct dlm_lock_data)) +
nla_total_size(0); /* why this? */
rv = prepare_data(DLM_CMD_TIMEOUT, &send_skb, size);
if (rv < 0)
return;
data = mk_data(send_skb);
if (!data) {
nlmsg_free(send_skb);
return;
}
fill_data(data, lkb);
send_data(send_skb);
}
......@@ -415,7 +415,7 @@ static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
rl->rl_flags = cpu_to_le32(dlm_dflags_val(lkb));
rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
rl->rl_rqmode = lkb->lkb_rqmode;
rl->rl_grmode = lkb->lkb_grmode;
......
......@@ -403,7 +403,7 @@ static void set_lock_master(struct list_head *queue, int nodeid)
struct dlm_lkb *lkb;
list_for_each_entry(lkb, queue, lkb_statequeue) {
if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) {
if (!test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
lkb->lkb_nodeid = nodeid;
lkb->lkb_remid = 0;
}
......
......@@ -214,8 +214,6 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_clear_members_gone(ls);
dlm_adjust_timeouts(ls);
dlm_callback_resume(ls);
error = enable_locking(ls, rv->seq);
......
......@@ -183,7 +183,8 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
struct dlm_user_proc *proc;
int rv;
if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD))
if (test_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags) ||
test_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags))
return;
ls = lkb->lkb_resource->res_ls;
......@@ -195,7 +196,8 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
for cases where a completion ast is received for an operation that
began before clear_proc_locks did its cancel/unlock. */
if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD))
if (test_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags) ||
test_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags))
goto out;
DLM_ASSERT(lkb->lkb_ua, dlm_print_lkb(lkb););
......@@ -206,7 +208,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
goto out;
if ((flags & DLM_CB_CAST) && lkb_is_endoflife(mode, status))
lkb->lkb_flags |= DLM_IFL_ENDOFLIFE;
set_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags);
spin_lock(&proc->asts_spin);
......@@ -229,7 +231,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
}
spin_unlock(&proc->asts_spin);
if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
/* N.B. spin_lock locks_spin, not asts_spin */
spin_lock(&proc->locks_spin);
if (!list_empty(&lkb->lkb_ownqueue)) {
......@@ -259,14 +261,6 @@ static int device_user_lock(struct dlm_user_proc *proc,
goto out;
}
#ifdef CONFIG_DLM_DEPRECATED_API
if (params->timeout)
pr_warn_once("========================================================\n"
"WARNING: the lkb timeout feature is being deprecated and\n"
" will be removed in v6.2!\n"
"========================================================\n");
#endif
ua = kzalloc(sizeof(struct dlm_user_args), GFP_NOFS);
if (!ua)
goto out;
......@@ -279,16 +273,9 @@ static int device_user_lock(struct dlm_user_proc *proc,
ua->xid = params->xid;
if (params->flags & DLM_LKF_CONVERT) {
#ifdef CONFIG_DLM_DEPRECATED_API
error = dlm_user_convert(ls, ua,
params->mode, params->flags,
params->lkid, params->lvb,
(unsigned long) params->timeout);
#else
error = dlm_user_convert(ls, ua,
params->mode, params->flags,
params->lkid, params->lvb);
#endif
} else if (params->flags & DLM_LKF_ORPHAN) {
error = dlm_user_adopt_orphan(ls, ua,
params->mode, params->flags,
......@@ -297,16 +284,9 @@ static int device_user_lock(struct dlm_user_proc *proc,
if (!error)
error = lkid;
} else {
#ifdef CONFIG_DLM_DEPRECATED_API
error = dlm_user_request(ls, ua,
params->mode, params->flags,
params->name, params->namelen,
(unsigned long) params->timeout);
#else
error = dlm_user_request(ls, ua,
params->mode, params->flags,
params->name, params->namelen);
#endif
if (!error)
error = ua->lksb.sb_lkid;
}
......@@ -884,7 +864,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
goto try_another;
case DLM_DEQUEUE_CALLBACK_LAST:
list_del_init(&lkb->lkb_cb_list);
lkb->lkb_flags &= ~DLM_IFL_CB_PENDING;
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
break;
case DLM_DEQUEUE_CALLBACK_SUCCESS:
break;
......
......@@ -53,9 +53,6 @@ struct dlm_lockspace_ops {
* The dlm should not use a resource directory, but statically assign
* resource mastery to nodes based on the name hash that is otherwise
* used to select the directory node. Must be the same on all nodes.
* DLM_LSFL_TIMEWARN
* The dlm should emit netlink messages if locks have been waiting
* for a configurable amount of time. (Unused.)
* DLM_LSFL_NEWEXCL
* dlm_new_lockspace() should return -EEXIST if the lockspace exists.
*
......
......@@ -47,16 +47,8 @@
{ DLM_SBF_ALTMODE, "ALTMODE" })
#define show_lkb_flags(flags) __print_flags(flags, "|", \
{ DLM_IFL_MSTCPY, "MSTCPY" }, \
{ DLM_IFL_RESEND, "RESEND" }, \
{ DLM_IFL_DEAD, "DEAD" }, \
{ DLM_IFL_OVERLAP_UNLOCK, "OVERLAP_UNLOCK" }, \
{ DLM_IFL_OVERLAP_CANCEL, "OVERLAP_CANCEL" }, \
{ DLM_IFL_ENDOFLIFE, "ENDOFLIFE" }, \
{ DLM_IFL_DEADLOCK_CANCEL, "DEADLOCK_CANCEL" }, \
{ DLM_IFL_STUB_MS, "STUB_MS" }, \
{ DLM_IFL_USER, "USER" }, \
{ DLM_IFL_ORPHAN, "ORPHAN" })
{ BIT(DLM_DFL_USER_BIT), "USER" }, \
{ BIT(DLM_DFL_ORPHAN_BIT), "ORPHAN" })
#define show_header_cmd(cmd) __print_symbolic(cmd, \
{ DLM_MSG, "MSG"}, \
......
......@@ -68,6 +68,7 @@ struct dlm_lksb {
/* dlm_new_lockspace() flags */
/* DLM_LSFL_TIMEWARN is deprecated and reserved. DO NOT USE! */
#define DLM_LSFL_TIMEWARN 0x00000002
#define DLM_LSFL_NEWEXCL 0x00000008
......
/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
/*
* Copyright (C) 2007 Red Hat, Inc. All rights reserved.
*
* This copyrighted material is made available to anyone wishing to use,
* modify, copy, or redistribute it subject to the terms and conditions
* of the GNU General Public License v.2.
*/
#ifndef _DLM_NETLINK_H
#define _DLM_NETLINK_H
#include <linux/types.h>
#include <linux/dlmconstants.h>
enum {
DLM_STATUS_WAITING = 1,
DLM_STATUS_GRANTED = 2,
DLM_STATUS_CONVERT = 3,
};
#define DLM_LOCK_DATA_VERSION 1
struct dlm_lock_data {
__u16 version;
__u32 lockspace_id;
int nodeid;
int ownpid;
__u32 id;
__u32 remid;
__u64 xid;
__s8 status;
__s8 grmode;
__s8 rqmode;
unsigned long timestamp;
int resource_namelen;
char resource_name[DLM_RESNAME_MAXLEN];
};
enum {
DLM_CMD_UNSPEC = 0,
DLM_CMD_HELLO, /* user->kernel */
DLM_CMD_TIMEOUT, /* kernel->user */
__DLM_CMD_MAX,
};
#define DLM_CMD_MAX (__DLM_CMD_MAX - 1)
enum {
DLM_TYPE_UNSPEC = 0,
DLM_TYPE_LOCK,
__DLM_TYPE_MAX,
};
#define DLM_TYPE_MAX (__DLM_TYPE_MAX - 1)
#define DLM_GENL_VERSION 0x1
#define DLM_GENL_NAME "DLM"
#endif /* _DLM_NETLINK_H */
......@@ -87,7 +87,6 @@
* DLM_LKF_NODLCKWT
*
* Do not cancel the lock if it gets into conversion deadlock.
* Exclude this lock from being monitored due to DLM_LSFL_TIMEWARN.
*
* DLM_LKF_NODLCKBLK
*
......@@ -132,6 +131,10 @@
* Unlock the lock even if it is converting or waiting or has sublocks.
* Only really for use by the userland device.c code.
*
* DLM_LKF_TIMEOUT
*
* This value is deprecated and reserved. DO NOT USE!
*
*/
#define DLM_LKF_NOQUEUE 0x00000001
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment