Commit 1dd13c8d authored by Kent Overstreet's avatar Kent Overstreet

bcache: kill closure locking code

Also flesh out the documentation a bit
Signed-off-by: default avatarKent Overstreet <kmo@daterainc.com>
parent cb7a583e
...@@ -11,19 +11,6 @@ ...@@ -11,19 +11,6 @@
#include "closure.h" #include "closure.h"
#define CL_FIELD(type, field) \
case TYPE_ ## type: \
return &container_of(cl, struct type, cl)->field
static struct closure_waitlist *closure_waitlist(struct closure *cl)
{
switch (cl->type) {
CL_FIELD(closure_with_waitlist, wait);
default:
return NULL;
}
}
static inline void closure_put_after_sub(struct closure *cl, int flags) static inline void closure_put_after_sub(struct closure *cl, int flags)
{ {
int r = flags & CLOSURE_REMAINING_MASK; int r = flags & CLOSURE_REMAINING_MASK;
...@@ -42,17 +29,10 @@ static inline void closure_put_after_sub(struct closure *cl, int flags) ...@@ -42,17 +29,10 @@ static inline void closure_put_after_sub(struct closure *cl, int flags)
closure_queue(cl); closure_queue(cl);
} else { } else {
struct closure *parent = cl->parent; struct closure *parent = cl->parent;
struct closure_waitlist *wait = closure_waitlist(cl);
closure_fn *destructor = cl->fn; closure_fn *destructor = cl->fn;
closure_debug_destroy(cl); closure_debug_destroy(cl);
smp_mb();
atomic_set(&cl->remaining, -1);
if (wait)
closure_wake_up(wait);
if (destructor) if (destructor)
destructor(cl); destructor(cl);
...@@ -69,19 +49,18 @@ void closure_sub(struct closure *cl, int v) ...@@ -69,19 +49,18 @@ void closure_sub(struct closure *cl, int v)
} }
EXPORT_SYMBOL(closure_sub); EXPORT_SYMBOL(closure_sub);
/**
* closure_put - decrement a closure's refcount
*/
void closure_put(struct closure *cl) void closure_put(struct closure *cl)
{ {
closure_put_after_sub(cl, atomic_dec_return(&cl->remaining)); closure_put_after_sub(cl, atomic_dec_return(&cl->remaining));
} }
EXPORT_SYMBOL(closure_put); EXPORT_SYMBOL(closure_put);
static void set_waiting(struct closure *cl, unsigned long f) /**
{ * closure_wake_up - wake up all closures on a wait list, without memory barrier
#ifdef CONFIG_BCACHE_CLOSURES_DEBUG */
cl->waiting_on = f;
#endif
}
void __closure_wake_up(struct closure_waitlist *wait_list) void __closure_wake_up(struct closure_waitlist *wait_list)
{ {
struct llist_node *list; struct llist_node *list;
...@@ -106,27 +85,34 @@ void __closure_wake_up(struct closure_waitlist *wait_list) ...@@ -106,27 +85,34 @@ void __closure_wake_up(struct closure_waitlist *wait_list)
cl = container_of(reverse, struct closure, list); cl = container_of(reverse, struct closure, list);
reverse = llist_next(reverse); reverse = llist_next(reverse);
set_waiting(cl, 0); closure_set_waiting(cl, 0);
closure_sub(cl, CLOSURE_WAITING + 1); closure_sub(cl, CLOSURE_WAITING + 1);
} }
} }
EXPORT_SYMBOL(__closure_wake_up); EXPORT_SYMBOL(__closure_wake_up);
bool closure_wait(struct closure_waitlist *list, struct closure *cl) /**
* closure_wait - add a closure to a waitlist
*
* @waitlist will own a ref on @cl, which will be released when
* closure_wake_up() is called on @waitlist.
*
*/
bool closure_wait(struct closure_waitlist *waitlist, struct closure *cl)
{ {
if (atomic_read(&cl->remaining) & CLOSURE_WAITING) if (atomic_read(&cl->remaining) & CLOSURE_WAITING)
return false; return false;
set_waiting(cl, _RET_IP_); closure_set_waiting(cl, _RET_IP_);
atomic_add(CLOSURE_WAITING + 1, &cl->remaining); atomic_add(CLOSURE_WAITING + 1, &cl->remaining);
llist_add(&cl->list, &list->list); llist_add(&cl->list, &waitlist->list);
return true; return true;
} }
EXPORT_SYMBOL(closure_wait); EXPORT_SYMBOL(closure_wait);
/** /**
* closure_sync() - sleep until a closure a closure has nothing left to wait on * closure_sync - sleep until a closure a closure has nothing left to wait on
* *
* Sleeps until the refcount hits 1 - the thread that's running the closure owns * Sleeps until the refcount hits 1 - the thread that's running the closure owns
* the last refcount. * the last refcount.
...@@ -148,46 +134,6 @@ void closure_sync(struct closure *cl) ...@@ -148,46 +134,6 @@ void closure_sync(struct closure *cl)
} }
EXPORT_SYMBOL(closure_sync); EXPORT_SYMBOL(closure_sync);
/**
* closure_trylock() - try to acquire the closure, without waiting
* @cl: closure to lock
*
* Returns true if the closure was succesfully locked.
*/
bool closure_trylock(struct closure *cl, struct closure *parent)
{
if (atomic_cmpxchg(&cl->remaining, -1,
CLOSURE_REMAINING_INITIALIZER) != -1)
return false;
smp_mb();
cl->parent = parent;
if (parent)
closure_get(parent);
closure_set_ret_ip(cl);
closure_debug_create(cl);
return true;
}
EXPORT_SYMBOL(closure_trylock);
void __closure_lock(struct closure *cl, struct closure *parent,
struct closure_waitlist *wait_list)
{
struct closure wait;
closure_init_stack(&wait);
while (1) {
if (closure_trylock(cl, parent))
return;
closure_wait_event(wait_list, &wait,
atomic_read(&cl->remaining) == -1);
}
}
EXPORT_SYMBOL(__closure_lock);
#ifdef CONFIG_BCACHE_CLOSURES_DEBUG #ifdef CONFIG_BCACHE_CLOSURES_DEBUG
static LIST_HEAD(closure_list); static LIST_HEAD(closure_list);
......
...@@ -72,30 +72,6 @@ ...@@ -72,30 +72,6 @@
* closure - _always_ use continue_at(). Doing so consistently will help * closure - _always_ use continue_at(). Doing so consistently will help
* eliminate an entire class of particularly pernicious races. * eliminate an entire class of particularly pernicious races.
* *
* For a closure to wait on an arbitrary event, we need to introduce waitlists:
*
* struct closure_waitlist list;
* closure_wait_event(list, cl, condition);
* closure_wake_up(wait_list);
*
* These work analagously to wait_event() and wake_up() - except that instead of
* operating on the current thread (for wait_event()) and lists of threads, they
* operate on an explicit closure and lists of closures.
*
* Because it's a closure we can now wait either synchronously or
* asynchronously. closure_wait_event() returns the current value of the
* condition, and if it returned false continue_at() or closure_sync() can be
* used to wait for it to become true.
*
* It's useful for waiting on things when you can't sleep in the context in
* which you must check the condition (perhaps a spinlock held, or you might be
* beneath generic_make_request() - in which case you can't sleep on IO).
*
* closure_wait_event() will wait either synchronously or asynchronously,
* depending on whether the closure is in blocking mode or not. You can pick a
* mode explicitly with closure_wait_event_sync() and
* closure_wait_event_async(), which do just what you might expect.
*
* Lastly, you might have a wait list dedicated to a specific event, and have no * Lastly, you might have a wait list dedicated to a specific event, and have no
* need for specifying the condition - you just want to wait until someone runs * need for specifying the condition - you just want to wait until someone runs
* closure_wake_up() on the appropriate wait list. In that case, just use * closure_wake_up() on the appropriate wait list. In that case, just use
...@@ -121,40 +97,6 @@ ...@@ -121,40 +97,6 @@
* All this implies that a closure should typically be embedded in a particular * All this implies that a closure should typically be embedded in a particular
* struct (which its refcount will normally control the lifetime of), and that * struct (which its refcount will normally control the lifetime of), and that
* struct can very much be thought of as a stack frame. * struct can very much be thought of as a stack frame.
*
* Locking:
*
* Closures are based on work items but they can be thought of as more like
* threads - in that like threads and unlike work items they have a well
* defined lifetime; they are created (with closure_init()) and eventually
* complete after a continue_at(cl, NULL, NULL).
*
* Suppose you've got some larger structure with a closure embedded in it that's
* used for periodically doing garbage collection. You only want one garbage
* collection happening at a time, so the natural thing to do is protect it with
* a lock. However, it's difficult to use a lock protecting a closure correctly
* because the unlock should come after the last continue_to() (additionally, if
* you're using the closure asynchronously a mutex won't work since a mutex has
* to be unlocked by the same process that locked it).
*
* So to make it less error prone and more efficient, we also have the ability
* to use closures as locks:
*
* closure_init_unlocked();
* closure_trylock();
*
* That's all we need for trylock() - the last closure_put() implicitly unlocks
* it for you. But for closure_lock(), we also need a wait list:
*
* struct closure_with_waitlist frobnicator_cl;
*
* closure_init_unlocked(&frobnicator_cl);
* closure_lock(&frobnicator_cl);
*
* A closure_with_waitlist embeds a closure and a wait list - much like struct
* delayed_work embeds a work item and a timer_list. The important thing is, use
* it exactly like you would a regular closure and closure_put() will magically
* handle everything for you.
*/ */
struct closure; struct closure;
...@@ -164,12 +106,6 @@ struct closure_waitlist { ...@@ -164,12 +106,6 @@ struct closure_waitlist {
struct llist_head list; struct llist_head list;
}; };
enum closure_type {
TYPE_closure = 0,
TYPE_closure_with_waitlist = 1,
MAX_CLOSURE_TYPE = 1,
};
enum closure_state { enum closure_state {
/* /*
* CLOSURE_WAITING: Set iff the closure is on a waitlist. Must be set by * CLOSURE_WAITING: Set iff the closure is on a waitlist. Must be set by
...@@ -224,8 +160,6 @@ struct closure { ...@@ -224,8 +160,6 @@ struct closure {
atomic_t remaining; atomic_t remaining;
enum closure_type type;
#ifdef CONFIG_BCACHE_CLOSURES_DEBUG #ifdef CONFIG_BCACHE_CLOSURES_DEBUG
#define CLOSURE_MAGIC_DEAD 0xc054dead #define CLOSURE_MAGIC_DEAD 0xc054dead
#define CLOSURE_MAGIC_ALIVE 0xc054a11e #define CLOSURE_MAGIC_ALIVE 0xc054a11e
...@@ -237,34 +171,12 @@ struct closure { ...@@ -237,34 +171,12 @@ struct closure {
#endif #endif
}; };
struct closure_with_waitlist {
struct closure cl;
struct closure_waitlist wait;
};
extern unsigned invalid_closure_type(void);
#define __CLOSURE_TYPE(cl, _t) \
__builtin_types_compatible_p(typeof(cl), struct _t) \
? TYPE_ ## _t : \
#define __closure_type(cl) \
( \
__CLOSURE_TYPE(cl, closure) \
__CLOSURE_TYPE(cl, closure_with_waitlist) \
invalid_closure_type() \
)
void closure_sub(struct closure *cl, int v); void closure_sub(struct closure *cl, int v);
void closure_put(struct closure *cl); void closure_put(struct closure *cl);
void __closure_wake_up(struct closure_waitlist *list); void __closure_wake_up(struct closure_waitlist *list);
bool closure_wait(struct closure_waitlist *list, struct closure *cl); bool closure_wait(struct closure_waitlist *list, struct closure *cl);
void closure_sync(struct closure *cl); void closure_sync(struct closure *cl);
bool closure_trylock(struct closure *cl, struct closure *parent);
void __closure_lock(struct closure *cl, struct closure *parent,
struct closure_waitlist *wait_list);
#ifdef CONFIG_BCACHE_CLOSURES_DEBUG #ifdef CONFIG_BCACHE_CLOSURES_DEBUG
void closure_debug_init(void); void closure_debug_init(void);
...@@ -293,123 +205,97 @@ static inline void closure_set_ret_ip(struct closure *cl) ...@@ -293,123 +205,97 @@ static inline void closure_set_ret_ip(struct closure *cl)
#endif #endif
} }
static inline void closure_get(struct closure *cl) static inline void closure_set_waiting(struct closure *cl, unsigned long f)
{ {
#ifdef CONFIG_BCACHE_CLOSURES_DEBUG #ifdef CONFIG_BCACHE_CLOSURES_DEBUG
BUG_ON((atomic_inc_return(&cl->remaining) & cl->waiting_on = f;
CLOSURE_REMAINING_MASK) <= 1);
#else
atomic_inc(&cl->remaining);
#endif #endif
} }
static inline void __closure_end_sleep(struct closure *cl)
{
__set_current_state(TASK_RUNNING);
if (atomic_read(&cl->remaining) & CLOSURE_SLEEPING)
atomic_sub(CLOSURE_SLEEPING, &cl->remaining);
}
static inline void __closure_start_sleep(struct closure *cl)
{
closure_set_ip(cl);
cl->task = current;
set_current_state(TASK_UNINTERRUPTIBLE);
if (!(atomic_read(&cl->remaining) & CLOSURE_SLEEPING))
atomic_add(CLOSURE_SLEEPING, &cl->remaining);
}
static inline void closure_set_stopped(struct closure *cl) static inline void closure_set_stopped(struct closure *cl)
{ {
atomic_sub(CLOSURE_RUNNING, &cl->remaining); atomic_sub(CLOSURE_RUNNING, &cl->remaining);
} }
static inline bool closure_is_unlocked(struct closure *cl) static inline void set_closure_fn(struct closure *cl, closure_fn *fn,
struct workqueue_struct *wq)
{ {
return atomic_read(&cl->remaining) == -1; BUG_ON(object_is_on_stack(cl));
closure_set_ip(cl);
cl->fn = fn;
cl->wq = wq;
/* between atomic_dec() in closure_put() */
smp_mb__before_atomic_dec();
} }
static inline void do_closure_init(struct closure *cl, struct closure *parent, static inline void closure_queue(struct closure *cl)
bool running)
{ {
cl->parent = parent; struct workqueue_struct *wq = cl->wq;
if (parent) if (wq) {
closure_get(parent); INIT_WORK(&cl->work, cl->work.func);
BUG_ON(!queue_work(wq, &cl->work));
if (running) {
closure_debug_create(cl);
atomic_set(&cl->remaining, CLOSURE_REMAINING_INITIALIZER);
} else } else
atomic_set(&cl->remaining, -1); cl->fn(cl);
closure_set_ip(cl);
} }
/* /**
* Hack to get at the embedded closure if there is one, by doing an unsafe cast: * closure_get - increment a closure's refcount
* the result of __closure_type() is thrown away, it's used merely for type
* checking.
*/ */
#define __to_internal_closure(cl) \ static inline void closure_get(struct closure *cl)
({ \ {
BUILD_BUG_ON(__closure_type(*cl) > MAX_CLOSURE_TYPE); \ #ifdef CONFIG_BCACHE_CLOSURES_DEBUG
(struct closure *) cl; \ BUG_ON((atomic_inc_return(&cl->remaining) &
}) CLOSURE_REMAINING_MASK) <= 1);
#else
#define closure_init_type(cl, parent, running) \ atomic_inc(&cl->remaining);
do { \ #endif
struct closure *_cl = __to_internal_closure(cl); \ }
_cl->type = __closure_type(*(cl)); \
do_closure_init(_cl, parent, running); \
} while (0)
/** /**
* closure_init() - Initialize a closure, setting the refcount to 1 * closure_init - Initialize a closure, setting the refcount to 1
* @cl: closure to initialize * @cl: closure to initialize
* @parent: parent of the new closure. cl will take a refcount on it for its * @parent: parent of the new closure. cl will take a refcount on it for its
* lifetime; may be NULL. * lifetime; may be NULL.
*/ */
#define closure_init(cl, parent) \ static inline void closure_init(struct closure *cl, struct closure *parent)
closure_init_type(cl, parent, true)
static inline void closure_init_stack(struct closure *cl)
{ {
memset(cl, 0, sizeof(struct closure)); memset(cl, 0, sizeof(struct closure));
atomic_set(&cl->remaining, CLOSURE_REMAINING_INITIALIZER|CLOSURE_STACK); cl->parent = parent;
} if (parent)
closure_get(parent);
/**
* closure_init_unlocked() - Initialize a closure but leave it unlocked.
* @cl: closure to initialize
*
* For when the closure will be used as a lock. The closure may not be used
* until after a closure_lock() or closure_trylock().
*/
#define closure_init_unlocked(cl) \
do { \
memset((cl), 0, sizeof(*(cl))); \
closure_init_type(cl, NULL, false); \
} while (0)
/**
* closure_lock() - lock and initialize a closure.
* @cl: the closure to lock
* @parent: the new parent for this closure
*
* The closure must be of one of the types that has a waitlist (otherwise we
* wouldn't be able to sleep on contention).
*
* @parent has exactly the same meaning as in closure_init(); if non null, the
* closure will take a reference on @parent which will be released when it is
* unlocked.
*/
#define closure_lock(cl, parent) \
__closure_lock(__to_internal_closure(cl), parent, &(cl)->wait)
static inline void __closure_end_sleep(struct closure *cl) atomic_set(&cl->remaining, CLOSURE_REMAINING_INITIALIZER);
{
__set_current_state(TASK_RUNNING);
if (atomic_read(&cl->remaining) & CLOSURE_SLEEPING) closure_debug_create(cl);
atomic_sub(CLOSURE_SLEEPING, &cl->remaining); closure_set_ip(cl);
} }
static inline void __closure_start_sleep(struct closure *cl) static inline void closure_init_stack(struct closure *cl)
{ {
closure_set_ip(cl); memset(cl, 0, sizeof(struct closure));
cl->task = current; atomic_set(&cl->remaining, CLOSURE_REMAINING_INITIALIZER|CLOSURE_STACK);
set_current_state(TASK_UNINTERRUPTIBLE);
if (!(atomic_read(&cl->remaining) & CLOSURE_SLEEPING))
atomic_add(CLOSURE_SLEEPING, &cl->remaining);
} }
/** /**
* closure_wake_up() - wake up all closures on a wait list. * closure_wake_up - wake up all closures on a wait list.
*/ */
static inline void closure_wake_up(struct closure_waitlist *list) static inline void closure_wake_up(struct closure_waitlist *list)
{ {
...@@ -417,69 +303,19 @@ static inline void closure_wake_up(struct closure_waitlist *list) ...@@ -417,69 +303,19 @@ static inline void closure_wake_up(struct closure_waitlist *list)
__closure_wake_up(list); __closure_wake_up(list);
} }
/* /**
* Wait on an event, synchronously or asynchronously - analogous to wait_event() * continue_at - jump to another function with barrier
* but for closures. *
* * After @cl is no longer waiting on anything (i.e. all outstanding refs have
* The loop is oddly structured so as to avoid a race; we must check the * been dropped with closure_put()), it will resume execution at @fn running out
* condition again after we've added ourself to the waitlist. We know if we were * of @wq (or, if @wq is NULL, @fn will be called by closure_put() directly).
* already on the waitlist because closure_wait() returns false; thus, we only *
* schedule or break if closure_wait() returns false. If it returns true, we * NOTE: This macro expands to a return in the calling function!
* just loop again - rechecking the condition. *
* * This is because after calling continue_at() you no longer have a ref on @cl,
* The __closure_wake_up() is necessary because we may race with the event * and whatever @cl owns may be freed out from under you - a running closure fn
* becoming true; i.e. we see event false -> wait -> recheck condition, but the * has a ref on its own closure which continue_at() drops.
* thread that made the event true may have called closure_wake_up() before we
* added ourself to the wait list.
*
* We have to call closure_sync() at the end instead of just
* __closure_end_sleep() because a different thread might've called
* closure_wake_up() before us and gotten preempted before they dropped the
* refcount on our closure. If this was a stack allocated closure, that would be
* bad.
*/ */
#define closure_wait_event(list, cl, condition) \
({ \
typeof(condition) ret; \
\
while (1) { \
ret = (condition); \
if (ret) { \
__closure_wake_up(list); \
closure_sync(cl); \
break; \
} \
\
__closure_start_sleep(cl); \
\
if (!closure_wait(list, cl)) \
schedule(); \
} \
\
ret; \
})
static inline void closure_queue(struct closure *cl)
{
struct workqueue_struct *wq = cl->wq;
if (wq) {
INIT_WORK(&cl->work, cl->work.func);
BUG_ON(!queue_work(wq, &cl->work));
} else
cl->fn(cl);
}
static inline void set_closure_fn(struct closure *cl, closure_fn *fn,
struct workqueue_struct *wq)
{
BUG_ON(object_is_on_stack(cl));
closure_set_ip(cl);
cl->fn = fn;
cl->wq = wq;
/* between atomic_dec() in closure_put() */
smp_mb__before_atomic_dec();
}
#define continue_at(_cl, _fn, _wq) \ #define continue_at(_cl, _fn, _wq) \
do { \ do { \
set_closure_fn(_cl, _fn, _wq); \ set_closure_fn(_cl, _fn, _wq); \
...@@ -487,8 +323,28 @@ do { \ ...@@ -487,8 +323,28 @@ do { \
return; \ return; \
} while (0) } while (0)
/**
* closure_return - finish execution of a closure
*
* This is used to indicate that @cl is finished: when all outstanding refs on
* @cl have been dropped @cl's ref on its parent closure (as passed to
* closure_init()) will be dropped, if one was specified - thus this can be
* thought of as returning to the parent closure.
*/
#define closure_return(_cl) continue_at((_cl), NULL, NULL) #define closure_return(_cl) continue_at((_cl), NULL, NULL)
/**
* continue_at_nobarrier - jump to another function without barrier
*
* Causes @fn to be executed out of @cl, in @wq context (or called directly if
* @wq is NULL).
*
* NOTE: like continue_at(), this macro expands to a return in the caller!
*
* The ref the caller of continue_at_nobarrier() had on @cl is now owned by @fn,
* thus it's not safe to touch anything protected by @cl after a
* continue_at_nobarrier().
*/
#define continue_at_nobarrier(_cl, _fn, _wq) \ #define continue_at_nobarrier(_cl, _fn, _wq) \
do { \ do { \
set_closure_fn(_cl, _fn, _wq); \ set_closure_fn(_cl, _fn, _wq); \
...@@ -496,6 +352,15 @@ do { \ ...@@ -496,6 +352,15 @@ do { \
return; \ return; \
} while (0) } while (0)
/**
* closure_return - finish execution of a closure, with destructor
*
* Works like closure_return(), except @destructor will be called when all
* outstanding refs on @cl have been dropped; @destructor may be used to safely
* free the memory occupied by @cl, and it is called with the ref on the parent
* closure still held - so @destructor could safely return an item to a
* freelist protected by @cl's parent.
*/
#define closure_return_with_destructor(_cl, _destructor) \ #define closure_return_with_destructor(_cl, _destructor) \
do { \ do { \
set_closure_fn(_cl, _destructor, NULL); \ set_closure_fn(_cl, _destructor, NULL); \
...@@ -503,6 +368,13 @@ do { \ ...@@ -503,6 +368,13 @@ do { \
return; \ return; \
} while (0) } while (0)
/**
* closure_call - execute @fn out of a new, uninitialized closure
*
* Typically used when running out of one closure, and we want to run @fn
* asynchronously out of a new closure - @parent will then wait for @cl to
* finish.
*/
static inline void closure_call(struct closure *cl, closure_fn fn, static inline void closure_call(struct closure *cl, closure_fn fn,
struct workqueue_struct *wq, struct workqueue_struct *wq,
struct closure *parent) struct closure *parent)
...@@ -511,12 +383,4 @@ static inline void closure_call(struct closure *cl, closure_fn fn, ...@@ -511,12 +383,4 @@ static inline void closure_call(struct closure *cl, closure_fn fn,
continue_at_nobarrier(cl, fn, wq); continue_at_nobarrier(cl, fn, wq);
} }
static inline void closure_trylock_call(struct closure *cl, closure_fn fn,
struct workqueue_struct *wq,
struct closure *parent)
{
if (closure_trylock(cl, parent))
continue_at_nobarrier(cl, fn, wq);
}
#endif /* _LINUX_CLOSURE_H */ #endif /* _LINUX_CLOSURE_H */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment