Commit 14c14661 authored by Sergei Golubchik's avatar Sergei Golubchik

wt needs to use its own implementation of rwlocks with

reader preference, at least where system rwlocks are fair.

include/my_global.h:
  wt uses mutex-based rwlock implementation unless on linux
include/waiting_threads.h:
  mutex-based rwlock implementation with reader preference
mysys/thr_rwlock.c:
  revert the change. make my_rw_locks fair
mysys/waiting_threads.c:
  mutex-based rwlock implementation with reader preference.
  convert complex multi-line macros to static functions
parent 9fb89454
......@@ -1524,6 +1524,15 @@ inline void operator delete[](void*, void*) { /* Do nothing */ }
*/
#ifdef TARGET_OS_LINUX
#define NEED_EXPLICIT_SYNC_DIR 1
#else
/*
On linux default rwlock scheduling policy is good enough for
waiting_threads.c, on other systems use our special implementation
(which is slower).
QQ perhaps this should be tested in configure ? how ?
*/
#define WT_RWLOCKS_USE_MUTEXES 1
#endif
#if !defined(__cplusplus) && !defined(bool)
......
......@@ -67,7 +67,6 @@ extern uint32 wt_success_stats;
e.g. accessing a resource by thd->waiting_for is safe,
a resource cannot be freed as there's a thread waiting for it
*/
typedef struct st_wt_resource {
WT_RESOURCE_ID id;
uint waiter_count;
......@@ -76,11 +75,27 @@ typedef struct st_wt_resource {
pthread_mutex_t *mutex;
#endif
/*
before the 'lock' all elements are mutable, after - immutable
in the sense that lf_hash_insert() won't memcpy() over them.
before the 'lock' all elements are mutable, after (and including) -
immutable in the sense that lf_hash_insert() won't memcpy() over them.
See wt_init().
*/
#ifdef WT_RWLOCKS_USE_MUTEXES
/*
we need a special rwlock-like 'lock' to allow readers bypass
waiting writers, otherwise readers can deadlock.
writer starvation is technically possible, but unlikely, because
the contention is expected to be low.
*/
struct {
pthread_cond_t cond;
pthread_mutex_t mutex;
uint readers: 16;
uint pending_writers: 15;
uint write_locked: 1;
} lock;
#else
rw_lock_t lock;
#endif
pthread_cond_t cond;
DYNAMIC_ARRAY owners;
} WT_RESOURCE;
......
......@@ -89,7 +89,7 @@ int my_rw_rdlock(rw_lock_t *rwp)
pthread_mutex_lock(&rwp->lock);
/* active or queued writers */
while (( rwp->state < 0 ))
while ((rwp->state < 0 ) || rwp->waiters)
pthread_cond_wait( &rwp->readers, &rwp->lock);
rwp->state++;
......@@ -101,7 +101,7 @@ int my_rw_tryrdlock(rw_lock_t *rwp)
{
int res;
pthread_mutex_lock(&rwp->lock);
if ((rwp->state < 0 ))
if ((rwp->state < 0 ) || rwp->waiters)
res= EBUSY; /* Can't get lock */
else
{
......
......@@ -133,56 +133,105 @@ uint32 wt_cycle_stats[2][WT_CYCLE_STATS+1], wt_success_stats;
static my_atomic_rwlock_t cycle_stats_lock, wait_stats_lock, success_stats_lock;
#define increment_success_stats() \
do { \
my_atomic_rwlock_wrlock(&success_stats_lock); \
my_atomic_add32(&wt_success_stats, 1); \
my_atomic_rwlock_wrunlock(&success_stats_lock); \
} while (0)
#define increment_cycle_stats(X,SLOT) \
do { \
uint i= (X); \
if (i >= WT_CYCLE_STATS) \
i= WT_CYCLE_STATS; \
my_atomic_rwlock_wrlock(&cycle_stats_lock); \
my_atomic_add32(&wt_cycle_stats[SLOT][i], 1); \
my_atomic_rwlock_wrunlock(&cycle_stats_lock); \
} while (0)
#define increment_wait_stats(X,RET) \
do { \
uint i; \
if ((RET) == ETIMEDOUT) \
i= WT_WAIT_STATS; \
else \
{ \
ulonglong w=(X)/10; \
for (i=0; i < WT_WAIT_STATS && w > wt_wait_table[i]; i++) ; \
} \
my_atomic_rwlock_wrlock(&wait_stats_lock); \
my_atomic_add32(wt_wait_stats+i, 1); \
my_atomic_rwlock_wrunlock(&wait_stats_lock); \
} while (0)
#define rc_rdlock(X) \
do { \
WT_RESOURCE *R=(X); \
DBUG_PRINT("wt", ("LOCK resid=%lld for READ", R->id.value)); \
rw_rdlock(&R->lock); \
} while (0)
#define rc_wrlock(X) \
do { \
WT_RESOURCE *R=(X); \
DBUG_PRINT("wt", ("LOCK resid=%lld for WRITE", R->id.value)); \
rw_wrlock(&R->lock); \
} while (0)
#define rc_unlock(X) \
do { \
WT_RESOURCE *R=(X); \
DBUG_PRINT("wt", ("UNLOCK resid=%lld", R->id.value)); \
rw_unlock(&R->lock); \
} while (0)
static void increment_success_stats()
{
my_atomic_rwlock_wrlock(&success_stats_lock);
my_atomic_add32(&wt_success_stats, 1);
my_atomic_rwlock_wrunlock(&success_stats_lock);
}
static void increment_cycle_stats(uint depth, uint slot)
{
if (depth >= WT_CYCLE_STATS)
depth= WT_CYCLE_STATS;
my_atomic_rwlock_wrlock(&cycle_stats_lock);
my_atomic_add32(&wt_cycle_stats[slot][depth], 1);
my_atomic_rwlock_wrunlock(&cycle_stats_lock);
}
static void increment_wait_stats(ulonglong waited,int ret)
{
uint i;
if ((ret) == ETIMEDOUT)
i= WT_WAIT_STATS;
else
for (i=0; i < WT_WAIT_STATS && waited/10 > wt_wait_table[i]; i++) ;
my_atomic_rwlock_wrlock(&wait_stats_lock);
my_atomic_add32(wt_wait_stats+i, 1);
my_atomic_rwlock_wrunlock(&wait_stats_lock);
}
#ifdef WT_RWLOCKS_USE_MUTEXES
static void rc_rwlock_init(WT_RESOURCE *rc)
{
pthread_cond_init(&rc->lock.cond, 0);
pthread_mutex_init(&rc->lock.mutex, MY_MUTEX_INIT_FAST);
}
static void rc_rwlock_destroy(WT_RESOURCE *rc)
{
pthread_cond_destroy(&rc->lock.cond);
pthread_mutex_destroy(&rc->lock.mutex);
}
static void rc_rdlock(WT_RESOURCE *rc)
{
DBUG_PRINT("wt", ("TRYLOCK resid=%ld for READ", (ulong)rc->id.value));
pthread_mutex_lock(&rc->lock.mutex);
while (rc->lock.write_locked)
pthread_cond_wait(&rc->lock.cond, &rc->lock.mutex);
rc->lock.readers++;
pthread_mutex_unlock(&rc->lock.mutex);
DBUG_PRINT("wt", ("LOCK resid=%ld for READ", (ulong)rc->id.value));
}
static void rc_wrlock(WT_RESOURCE *rc)
{
DBUG_PRINT("wt", ("TRYLOCK resid=%ld for WRITE", (ulong)rc->id.value));
pthread_mutex_lock(&rc->lock.mutex);
while (rc->lock.write_locked || rc->lock.readers)
pthread_cond_wait(&rc->lock.cond, &rc->lock.mutex);
rc->lock.write_locked=1;
pthread_mutex_unlock(&rc->lock.mutex);
DBUG_PRINT("wt", ("LOCK resid=%ld for WRITE", (ulong)rc->id.value));
}
static void rc_unlock(WT_RESOURCE *rc)
{
DBUG_PRINT("wt", ("UNLOCK resid=%ld", (ulong)rc->id.value));
pthread_mutex_lock(&rc->lock.mutex);
if (rc->lock.write_locked)
{
rc->lock.write_locked=0;
pthread_cond_broadcast(&rc->lock.cond);
}
else if (--rc->lock.readers == 0)
pthread_cond_broadcast(&rc->lock.cond);
pthread_mutex_unlock(&rc->lock.mutex);
}
#else
static void rc_rwlock_init(WT_RESOURCE *rc)
{
my_rwlock_init(&rc->lock, 0);
}
static void rc_rwlock_destroy(WT_RESOURCE *rc)
{
rwlock_destroy(&rc->lock);
}
static void rc_rdlock(WT_RESOURCE *rc)
{
DBUG_PRINT("wt", ("TRYLOCK resid=%ld for READ", (ulong)rc->id.value));
rw_rdlock(&rc->lock);
DBUG_PRINT("wt", ("LOCK resid=%ld for READ", (ulong)rc->id.value));
}
static void rc_wrlock(WT_RESOURCE *rc)
{
DBUG_PRINT("wt", ("TRYLOCK resid=%ld for WRITE", (ulong)rc->id.value));
rw_wrlock(&rc->lock);
DBUG_PRINT("wt", ("LOCK resid=%ld for WRITE", (ulong)rc->id.value));
}
static void rc_unlock(WT_RESOURCE *rc)
{
DBUG_PRINT("wt", ("UNLOCK resid=%ld", (ulong)rc->id.value));
rw_unlock(&rc->lock);
}
#endif
/*
All resources are stored in a lock-free hash. Different threads
......@@ -202,7 +251,7 @@ static void wt_resource_init(uchar *arg)
DBUG_ENTER("wt_resource_init");
bzero(rc, sizeof(*rc));
my_rwlock_init(&rc->lock, 0);
rc_rwlock_init(rc);
pthread_cond_init(&rc->cond, 0);
my_init_dynamic_array(&rc->owners, sizeof(WT_THD *), 0, 5);
DBUG_VOID_RETURN;
......@@ -220,7 +269,7 @@ static void wt_resource_destroy(uchar *arg)
DBUG_ENTER("wt_resource_destroy");
DBUG_ASSERT(rc->owners.elements == 0);
rwlock_destroy(&rc->lock);
rc_rwlock_destroy(rc);
pthread_cond_destroy(&rc->cond);
delete_dynamic(&rc->owners);
DBUG_VOID_RETURN;
......@@ -490,7 +539,7 @@ retry:
}
end:
/*
Note that 'rc' is locked in this function, but it's never unlocked there.
Note that 'rc' is locked in this function, but it's never unlocked here.
Instead it's saved in arg->rc and the *caller* is expected to unlock it.
It's done to support different killing strategies. This is how it works:
Assuming a graph
......@@ -549,6 +598,7 @@ static int deadlock(WT_THD *thd, WT_THD *blocker, uint depth,
struct deadlock_arg arg= {thd, max_depth, 0, 0};
int ret;
DBUG_ENTER("deadlock");
DBUG_ASSERT(depth < 2);
ret= deadlock_search(&arg, blocker, depth);
if (ret == WT_DEPTH_EXCEEDED)
{
......@@ -688,8 +738,8 @@ int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid)
LF_REQUIRE_PINS(3);
DBUG_PRINT("wt", ("enter: thd=%s, blocker=%s, resid=%llu",
thd->name, blocker->name, resid->value));
DBUG_PRINT("wt", ("enter: thd=%s, blocker=%s, resid=%lu",
thd->name, blocker->name, (ulong)resid->value));
if (fix_thd_pins(thd))
DBUG_RETURN(WT_DEADLOCK);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment