Commit 16502046 authored by NeilBrown's avatar NeilBrown Committed by Greg Kroah-Hartman

staging: lustre: change sai_thread to sai_task.

Rather than allocating a ptlrpc_thread for the
stat-ahead thread, just use the task_struct provided
by kthreads directly.

As nothing ever waits for the sai_task, it must call do_exit()
directly rather than simply return from the function.
Also it cannot use kthread_should_stop() to know when to stop.

There is one caller which can ask it to stop so we need a simple
signaling mechanism.  I've chosen to set ->sai_task to NULL
when the thread should finish up.  The thread notices this and
cleans up and exits.
lli_sa_lock is used to avoid races between waking up the process
and the process exiting.
Signed-off-by: default avatarNeilBrown <neilb@suse.com>
Signed-off-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
parent c044fb0f
...@@ -1070,7 +1070,7 @@ struct ll_statahead_info { ...@@ -1070,7 +1070,7 @@ struct ll_statahead_info {
sai_agl_valid:1,/* AGL is valid for the dir */ sai_agl_valid:1,/* AGL is valid for the dir */
sai_in_readpage:1;/* statahead in readdir() */ sai_in_readpage:1;/* statahead in readdir() */
wait_queue_head_t sai_waitq; /* stat-ahead wait queue */ wait_queue_head_t sai_waitq; /* stat-ahead wait queue */
struct ptlrpc_thread sai_thread; /* stat-ahead thread */ struct task_struct *sai_task; /* stat-ahead thread */
struct task_struct *sai_agl_task; /* AGL thread */ struct task_struct *sai_agl_task; /* AGL thread */
struct list_head sai_interim_entries; /* entries which got async struct list_head sai_interim_entries; /* entries which got async
* stat reply, but not * stat reply, but not
......
...@@ -267,7 +267,7 @@ sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry) ...@@ -267,7 +267,7 @@ sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
/* called by scanner after use, sa_entry will be killed */ /* called by scanner after use, sa_entry will be killed */
static void static void
sa_put(struct ll_statahead_info *sai, struct sa_entry *entry) sa_put(struct ll_statahead_info *sai, struct sa_entry *entry, struct ll_inode_info *lli)
{ {
struct sa_entry *tmp, *next; struct sa_entry *tmp, *next;
...@@ -295,7 +295,11 @@ sa_put(struct ll_statahead_info *sai, struct sa_entry *entry) ...@@ -295,7 +295,11 @@ sa_put(struct ll_statahead_info *sai, struct sa_entry *entry)
sa_kill(sai, tmp); sa_kill(sai, tmp);
} }
wake_up(&sai->sai_thread.t_ctl_waitq); spin_lock(&lli->lli_sa_lock);
if (sai->sai_task)
wake_up_process(sai->sai_task);
spin_unlock(&lli->lli_sa_lock);
} }
/* /*
...@@ -403,7 +407,6 @@ static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry) ...@@ -403,7 +407,6 @@ static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
sai->sai_max = LL_SA_RPC_MIN; sai->sai_max = LL_SA_RPC_MIN;
sai->sai_index = 1; sai->sai_index = 1;
init_waitqueue_head(&sai->sai_waitq); init_waitqueue_head(&sai->sai_waitq);
init_waitqueue_head(&sai->sai_thread.t_ctl_waitq);
INIT_LIST_HEAD(&sai->sai_interim_entries); INIT_LIST_HEAD(&sai->sai_interim_entries);
INIT_LIST_HEAD(&sai->sai_entries); INIT_LIST_HEAD(&sai->sai_entries);
...@@ -465,7 +468,7 @@ static void ll_sai_put(struct ll_statahead_info *sai) ...@@ -465,7 +468,7 @@ static void ll_sai_put(struct ll_statahead_info *sai)
lli->lli_sai = NULL; lli->lli_sai = NULL;
spin_unlock(&lli->lli_sa_lock); spin_unlock(&lli->lli_sa_lock);
LASSERT(thread_is_stopped(&sai->sai_thread)); LASSERT(sai->sai_task == NULL);
LASSERT(sai->sai_agl_task == NULL); LASSERT(sai->sai_agl_task == NULL);
LASSERT(sai->sai_sent == sai->sai_replied); LASSERT(sai->sai_sent == sai->sai_replied);
LASSERT(!sa_has_callback(sai)); LASSERT(!sa_has_callback(sai));
...@@ -646,7 +649,6 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, ...@@ -646,7 +649,6 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
struct ll_inode_info *lli = ll_i2info(dir); struct ll_inode_info *lli = ll_i2info(dir);
struct ll_statahead_info *sai = lli->lli_sai; struct ll_statahead_info *sai = lli->lli_sai;
struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata; struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
wait_queue_head_t *waitq = NULL;
__u64 handle = 0; __u64 handle = 0;
if (it_disposition(it, DISP_LOOKUP_NEG)) if (it_disposition(it, DISP_LOOKUP_NEG))
...@@ -657,7 +659,6 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, ...@@ -657,7 +659,6 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
* sai should be always valid, no need to refcount * sai should be always valid, no need to refcount
*/ */
LASSERT(sai); LASSERT(sai);
LASSERT(!thread_is_stopped(&sai->sai_thread));
LASSERT(entry); LASSERT(entry);
CDEBUG(D_READA, "sa_entry %.*s rc %d\n", CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
...@@ -681,8 +682,9 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, ...@@ -681,8 +682,9 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
spin_lock(&lli->lli_sa_lock); spin_lock(&lli->lli_sa_lock);
if (rc) { if (rc) {
if (__sa_make_ready(sai, entry, rc)) if (__sa_make_ready(sai, entry, rc))
waitq = &sai->sai_waitq; wake_up(&sai->sai_waitq);
} else { } else {
int first = 0;
entry->se_minfo = minfo; entry->se_minfo = minfo;
entry->se_req = ptlrpc_request_addref(req); entry->se_req = ptlrpc_request_addref(req);
/* /*
...@@ -693,14 +695,15 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, ...@@ -693,14 +695,15 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
*/ */
entry->se_handle = handle; entry->se_handle = handle;
if (!sa_has_callback(sai)) if (!sa_has_callback(sai))
waitq = &sai->sai_thread.t_ctl_waitq; first = 1;
list_add_tail(&entry->se_list, &sai->sai_interim_entries); list_add_tail(&entry->se_list, &sai->sai_interim_entries);
if (first && sai->sai_task)
wake_up_process(sai->sai_task);
} }
sai->sai_replied++; sai->sai_replied++;
if (waitq)
wake_up(waitq);
spin_unlock(&lli->lli_sa_lock); spin_unlock(&lli->lli_sa_lock);
return rc; return rc;
...@@ -942,17 +945,13 @@ static int ll_statahead_thread(void *arg) ...@@ -942,17 +945,13 @@ static int ll_statahead_thread(void *arg)
struct inode *dir = d_inode(parent); struct inode *dir = d_inode(parent);
struct ll_inode_info *lli = ll_i2info(dir); struct ll_inode_info *lli = ll_i2info(dir);
struct ll_sb_info *sbi = ll_i2sbi(dir); struct ll_sb_info *sbi = ll_i2sbi(dir);
struct ll_statahead_info *sai; struct ll_statahead_info *sai = lli->lli_sai;
struct ptlrpc_thread *sa_thread;
struct page *page = NULL; struct page *page = NULL;
__u64 pos = 0; __u64 pos = 0;
int first = 0; int first = 0;
int rc = 0; int rc = 0;
struct md_op_data *op_data; struct md_op_data *op_data;
sai = ll_sai_get(dir);
sa_thread = &sai->sai_thread;
sa_thread->t_pid = current_pid();
CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n", CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
sai, parent); sai, parent);
...@@ -965,21 +964,7 @@ static int ll_statahead_thread(void *arg) ...@@ -965,21 +964,7 @@ static int ll_statahead_thread(void *arg)
op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages; op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
if (sbi->ll_flags & LL_SBI_AGL_ENABLED) while (pos != MDS_DIR_END_OFF && sai->sai_task) {
ll_start_agl(parent, sai);
atomic_inc(&sbi->ll_sa_total);
spin_lock(&lli->lli_sa_lock);
if (thread_is_init(sa_thread))
/* If someone else has changed the thread state
* (e.g. already changed to SVC_STOPPING), we can't just
* blindly overwrite that setting.
*/
thread_set_flags(sa_thread, SVC_RUNNING);
spin_unlock(&lli->lli_sa_lock);
wake_up(&sa_thread->t_ctl_waitq);
while (pos != MDS_DIR_END_OFF && thread_is_running(sa_thread)) {
struct lu_dirpage *dp; struct lu_dirpage *dp;
struct lu_dirent *ent; struct lu_dirent *ent;
...@@ -996,7 +981,7 @@ static int ll_statahead_thread(void *arg) ...@@ -996,7 +981,7 @@ static int ll_statahead_thread(void *arg)
dp = page_address(page); dp = page_address(page);
for (ent = lu_dirent_start(dp); for (ent = lu_dirent_start(dp);
ent && thread_is_running(sa_thread) && !sa_low_hit(sai); ent && sai->sai_task && !sa_low_hit(sai);
ent = lu_dirent_next(ent)) { ent = lu_dirent_next(ent)) {
struct lu_fid fid; struct lu_fid fid;
__u64 hash; __u64 hash;
...@@ -1046,13 +1031,7 @@ static int ll_statahead_thread(void *arg) ...@@ -1046,13 +1031,7 @@ static int ll_statahead_thread(void *arg)
fid_le_to_cpu(&fid, &ent->lde_fid); fid_le_to_cpu(&fid, &ent->lde_fid);
/* wait for spare statahead window */
do { do {
wait_event_idle(sa_thread->t_ctl_waitq,
!sa_sent_full(sai) ||
sa_has_callback(sai) ||
!list_empty(&sai->sai_agls) ||
!thread_is_running(sa_thread));
sa_handle_callback(sai); sa_handle_callback(sai);
spin_lock(&lli->lli_agl_lock); spin_lock(&lli->lli_agl_lock);
...@@ -1072,8 +1051,16 @@ static int ll_statahead_thread(void *arg) ...@@ -1072,8 +1051,16 @@ static int ll_statahead_thread(void *arg)
spin_lock(&lli->lli_agl_lock); spin_lock(&lli->lli_agl_lock);
} }
spin_unlock(&lli->lli_agl_lock); spin_unlock(&lli->lli_agl_lock);
} while (sa_sent_full(sai) &&
thread_is_running(sa_thread)); set_current_state(TASK_IDLE);
if (sa_sent_full(sai) &&
!sa_has_callback(sai) &&
agl_list_empty(sai) &&
sai->sai_task)
/* wait for spare statahead window */
schedule();
__set_current_state(TASK_RUNNING);
} while (sa_sent_full(sai) && sai->sai_task);
sa_statahead(parent, name, namelen, &fid); sa_statahead(parent, name, namelen, &fid);
} }
...@@ -1096,7 +1083,7 @@ static int ll_statahead_thread(void *arg) ...@@ -1096,7 +1083,7 @@ static int ll_statahead_thread(void *arg)
if (rc < 0) { if (rc < 0) {
spin_lock(&lli->lli_sa_lock); spin_lock(&lli->lli_sa_lock);
thread_set_flags(sa_thread, SVC_STOPPING); sai->sai_task = NULL;
lli->lli_sa_enabled = 0; lli->lli_sa_enabled = 0;
spin_unlock(&lli->lli_sa_lock); spin_unlock(&lli->lli_sa_lock);
} }
...@@ -1105,12 +1092,14 @@ static int ll_statahead_thread(void *arg) ...@@ -1105,12 +1092,14 @@ static int ll_statahead_thread(void *arg)
* statahead is finished, but statahead entries need to be cached, wait * statahead is finished, but statahead entries need to be cached, wait
* for file release to stop me. * for file release to stop me.
*/ */
while (thread_is_running(sa_thread)) { while (sai->sai_task) {
wait_event_idle(sa_thread->t_ctl_waitq,
sa_has_callback(sai) ||
!thread_is_running(sa_thread));
sa_handle_callback(sai); sa_handle_callback(sai);
set_current_state(TASK_IDLE);
if (!sa_has_callback(sai) &&
sai->sai_task)
schedule();
__set_current_state(TASK_RUNNING);
} }
out: out:
if (sai->sai_agl_task) { if (sai->sai_agl_task) {
...@@ -1126,26 +1115,23 @@ static int ll_statahead_thread(void *arg) ...@@ -1126,26 +1115,23 @@ static int ll_statahead_thread(void *arg)
*/ */
while (sai->sai_sent != sai->sai_replied) { while (sai->sai_sent != sai->sai_replied) {
/* in case we're not woken up, timeout wait */ /* in case we're not woken up, timeout wait */
wait_event_idle_timeout(sa_thread->t_ctl_waitq, schedule_timeout_idle(HZ>>3);
sai->sai_sent == sai->sai_replied,
HZ>>3);
} }
/* release resources held by statahead RPCs */ /* release resources held by statahead RPCs */
sa_handle_callback(sai); sa_handle_callback(sai);
spin_lock(&lli->lli_sa_lock);
thread_set_flags(sa_thread, SVC_STOPPED);
spin_unlock(&lli->lli_sa_lock);
CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %pd\n", CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %pd\n",
sai, parent); sai, parent);
spin_lock(&lli->lli_sa_lock);
sai->sai_task = NULL;
spin_unlock(&lli->lli_sa_lock);
wake_up(&sai->sai_waitq); wake_up(&sai->sai_waitq);
wake_up(&sa_thread->t_ctl_waitq);
ll_sai_put(sai); ll_sai_put(sai);
return rc; do_exit(rc);
} }
/* authorize opened dir handle @key to statahead */ /* authorize opened dir handle @key to statahead */
...@@ -1187,13 +1173,13 @@ void ll_deauthorize_statahead(struct inode *dir, void *key) ...@@ -1187,13 +1173,13 @@ void ll_deauthorize_statahead(struct inode *dir, void *key)
lli->lli_opendir_pid = 0; lli->lli_opendir_pid = 0;
lli->lli_sa_enabled = 0; lli->lli_sa_enabled = 0;
sai = lli->lli_sai; sai = lli->lli_sai;
if (sai && thread_is_running(&sai->sai_thread)) { if (sai && sai->sai_task) {
/* /*
* statahead thread may not quit yet because it needs to cache * statahead thread may not quit yet because it needs to cache
* entries, now it's time to tell it to quit. * entries, now it's time to tell it to quit.
*/ */
thread_set_flags(&sai->sai_thread, SVC_STOPPING); wake_up_process(sai->sai_task);
wake_up(&sai->sai_thread.t_ctl_waitq); sai->sai_task = NULL;
} }
spin_unlock(&lli->lli_sa_lock); spin_unlock(&lli->lli_sa_lock);
} }
...@@ -1463,7 +1449,7 @@ static int revalidate_statahead_dentry(struct inode *dir, ...@@ -1463,7 +1449,7 @@ static int revalidate_statahead_dentry(struct inode *dir,
*/ */
ldd = ll_d2d(*dentryp); ldd = ll_d2d(*dentryp);
ldd->lld_sa_generation = lli->lli_sa_generation; ldd->lld_sa_generation = lli->lli_sa_generation;
sa_put(sai, entry); sa_put(sai, entry, lli);
return rc; return rc;
} }
...@@ -1483,7 +1469,6 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry) ...@@ -1483,7 +1469,6 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
{ {
struct ll_inode_info *lli = ll_i2info(dir); struct ll_inode_info *lli = ll_i2info(dir);
struct ll_statahead_info *sai = NULL; struct ll_statahead_info *sai = NULL;
struct ptlrpc_thread *thread;
struct task_struct *task; struct task_struct *task;
struct dentry *parent = dentry->d_parent; struct dentry *parent = dentry->d_parent;
int rc; int rc;
...@@ -1523,18 +1508,21 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry) ...@@ -1523,18 +1508,21 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n", CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
current_pid(), parent); current_pid(), parent);
task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u", task = kthread_create(ll_statahead_thread, parent, "ll_sa_%u",
lli->lli_opendir_pid); lli->lli_opendir_pid);
thread = &sai->sai_thread;
if (IS_ERR(task)) { if (IS_ERR(task)) {
rc = PTR_ERR(task); rc = PTR_ERR(task);
CERROR("can't start ll_sa thread, rc : %d\n", rc); CERROR("can't start ll_sa thread, rc : %d\n", rc);
goto out; goto out;
} }
wait_event_idle(thread->t_ctl_waitq, if (ll_i2sbi(parent->d_inode)->ll_flags & LL_SBI_AGL_ENABLED)
thread_is_running(thread) || thread_is_stopped(thread)); ll_start_agl(parent, sai);
ll_sai_put(sai);
atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_total);
sai->sai_task = task;
wake_up_process(task);
/* /*
* We don't stat-ahead for the first dirent since we are already in * We don't stat-ahead for the first dirent since we are already in
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment