Commit 14969154 authored by Rich Prohaska's avatar Rich Prohaska

#56 simply launch of lock escalator and write a slightly better test

parent b9452de1
......@@ -318,27 +318,22 @@ void locktree::manager::run_escalation_for_test(void) {
void locktree::manager::run_escalation(void) {
uint64_t t0 = toku_current_time_microsec();
if (1) {
// run escalation on the background thread
int r;
toku_mutex_lock(&m_escalator_mutex);
toku_cond_broadcast(&m_escalator_work);
struct timeval tv;
r = gettimeofday(&tv, 0);
assert_zero(r);
uint64_t t = tv.tv_sec * 1000000 + tv.tv_usec;
t += 100000; // 100 milliseconds
toku_timespec_t wakeup_time;
wakeup_time.tv_sec = t / 1000000;
wakeup_time.tv_nsec = (t % 1000000) * 1000;
r = toku_cond_timedwait(&m_escalator_done, &m_escalator_mutex, &wakeup_time);
toku_mutex_unlock(&m_escalator_mutex);
} else {
// run escalation on this thread
mutex_lock();
escalate_all_locktrees();
mutex_unlock();
}
// run escalation on the background thread
int r;
toku_mutex_lock(&m_escalator_mutex);
toku_cond_broadcast(&m_escalator_work);
struct timeval tv;
r = gettimeofday(&tv, 0);
assert_zero(r);
uint64_t t = tv.tv_sec * 1000000 + tv.tv_usec;
t += 100000; // 100 milliseconds
toku_timespec_t wakeup_time;
wakeup_time.tv_sec = t / 1000000;
wakeup_time.tv_nsec = (t % 1000000) * 1000;
r = toku_cond_timedwait(&m_escalator_done, &m_escalator_mutex, &wakeup_time);
toku_mutex_unlock(&m_escalator_mutex);
// else run escalation on this thread
// mutex_lock(); escalate_all_locktrees(); mutex_unlock();
uint64_t t1 = toku_current_time_microsec();
add_escalator_wait_time(t1 - t0);
}
......
......@@ -96,6 +96,7 @@ PATENT RIGHTS GRANT:
using namespace toku;
static int verbose = 0;
static int killed = 0;
static void locktree_release_lock(locktree *lt, TXNID txn_id, int64_t left_k, int64_t right_k) {
range_buffer buffer;
......@@ -114,23 +115,26 @@ static int locktree_write_lock(locktree *lt, TXNID txn_id, int64_t left_k, int64
return lt->acquire_write_lock(txn_id, &left, &right, nullptr);
}
static void run_big_txn(locktree::manager *mgr, locktree *lt, TXNID txn_id) {
mgr = mgr;
for (int64_t i = 0; 1; i++) {
static void run_big_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_id) {
int64_t last_i = -1;
for (int64_t i = 0; !killed; i++) {
uint64_t t_start = toku_current_time_microsec();
int r = locktree_write_lock(lt, txn_id, i, i);
assert(r == 0);
last_i = i;
uint64_t t_end = toku_current_time_microsec();
uint64_t t_duration = t_end - t_start;
if (t_duration > 100000) {
printf("%u %s %" PRId64 " %" PRIu64 "\n", toku_os_gettid(), __FUNCTION__, i, t_duration);
}
toku_pthread_yield();
}
if (last_i != -1)
locktree_release_lock(lt, txn_id, 0, last_i); // release the range 0 .. last_i
}
static void run_small_txn(locktree::manager *mgr, locktree *lt, TXNID txn_id, int64_t k) {
mgr = mgr;
for (int64_t i = 0; 1; i++) {
static void run_small_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_id, int64_t k) {
for (int64_t i = 0; !killed; i++) {
uint64_t t_start = toku_current_time_microsec();
int r = locktree_write_lock(lt, txn_id, k, k);
assert(r == 0);
......@@ -139,7 +143,8 @@ static void run_small_txn(locktree::manager *mgr, locktree *lt, TXNID txn_id, in
if (t_duration > 100000) {
printf("%u %s %" PRId64 " %" PRIu64 "\n", toku_os_gettid(), __FUNCTION__, i, t_duration);
}
locktree_release_lock(lt, txn_id, i, i);
locktree_release_lock(lt, txn_id, k, k);
toku_pthread_yield();
}
}
......@@ -167,9 +172,38 @@ static void e_callback(TXNID txnid, const locktree *lt, const range_buffer &buff
printf("%u %s %" PRIu64 " %p %d %p\n", toku_os_gettid(), __FUNCTION__, txnid, lt, buffer.get_num_ranges(), extra);
}
static uint64_t get_escalation_count(locktree::manager &mgr) {
LTM_STATUS_S ltm_status;
mgr.get_status(&ltm_status);
TOKU_ENGINE_STATUS_ROW key_status = NULL;
// lookup keyname in status
for (int i = 0; ; i++) {
TOKU_ENGINE_STATUS_ROW status = &ltm_status.status[i];
if (status->keyname == NULL)
break;
if (strcmp(status->keyname, "LTM_ESCALATION_COUNT") == 0) {
key_status = status;
break;
}
}
assert(key_status);
return key_status->value.num;
}
int main(int argc, const char *argv[]) {
if (argc == 1 || argv == nullptr)
return 0;
uint64_t stalls = 0;
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--verbose") == 0) {
verbose++;
continue;
}
if (strcmp(argv[i], "--stalls") == 0 && i+1 < argc) {
stalls = atoll(argv[++i]);
continue;
}
}
int r;
// create a manager
......@@ -186,6 +220,7 @@ int main(int argc, const char *argv[]) {
DICTIONARY_ID dict_id_1 = { 2 };
locktree *lt_1 = mgr.get_lt(dict_id_1, desc_1, compare_dbts, nullptr);
// create the worker threads
struct arg big_arg = { &mgr, lt_0, 1000 };
pthread_t big_id;
r = toku_pthread_create(&big_id, nullptr, big_f, &big_arg);
......@@ -201,6 +236,13 @@ int main(int argc, const char *argv[]) {
assert(r == 0);
}
// wait for some escalations to occur
while (get_escalation_count(mgr) < stalls) {
sleep(1);
}
killed = 1;
// cleanup
void *ret;
r = toku_pthread_join(big_id, &ret);
assert(r == 0);
......@@ -210,7 +252,6 @@ int main(int argc, const char *argv[]) {
assert(r == 0);
}
// cleanup
mgr.release_lt(lt_0);
mgr.release_lt(lt_1);
mgr.destroy();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment