Commit 3e2b3f89 authored by John Esmet's avatar John Esmet

fixes #206 Factor out inner classes to be their own classes. Relax some

abstractions and strengthen others, with an eye for simplicity and
consistency, not over-encapsulation.
parent c1cc6c36
...@@ -338,7 +338,7 @@ int lock_request::retry(void) { ...@@ -338,7 +338,7 @@ int lock_request::retry(void) {
} }
void lock_request::retry_all_lock_requests(locktree *lt) { void lock_request::retry_all_lock_requests(locktree *lt) {
locktree::lt_lock_request_info *info = lt->get_lock_request_info(); lt_lock_request_info *info = lt->get_lock_request_info();
// if a thread reads this bit to be true, then it should go ahead and // if a thread reads this bit to be true, then it should go ahead and
// take the locktree mutex and retry lock requests. we use this bit // take the locktree mutex and retry lock requests. we use this bit
......
...@@ -202,7 +202,7 @@ private: ...@@ -202,7 +202,7 @@ private:
// the lock request info state stored in the // the lock request info state stored in the
// locktree that this lock request is for. // locktree that this lock request is for.
struct locktree::lt_lock_request_info *m_info; struct lt_lock_request_info *m_info;
// effect: tries again to acquire the lock described by this lock request // effect: tries again to acquire the lock described by this lock request
// returns: 0 if retrying the request succeeded and is now complete // returns: 0 if retrying the request succeeded and is now complete
......
...@@ -116,10 +116,9 @@ namespace toku { ...@@ -116,10 +116,9 @@ namespace toku {
// but does nothing based on the value of the reference count - it is // but does nothing based on the value of the reference count - it is
// up to the user of the locktree to destroy it when it sees fit. // up to the user of the locktree to destroy it when it sees fit.
void locktree::create(manager::memory_tracker *mem_tracker, DICTIONARY_ID dict_id, void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id,
DESCRIPTOR desc, ft_compare_func cmp) { DESCRIPTOR desc, ft_compare_func cmp) {
m_mem_tracker = mem_tracker; m_mgr = mgr;
m_mgr = mem_tracker->get_manager();
m_dict_id = dict_id; m_dict_id = dict_id;
// the only reason m_cmp is malloc'd here is to prevent gdb from printing // the only reason m_cmp is malloc'd here is to prevent gdb from printing
...@@ -164,6 +163,18 @@ void locktree::destroy(void) { ...@@ -164,6 +163,18 @@ void locktree::destroy(void) {
m_lock_request_info.pending_lock_requests.destroy(); m_lock_request_info.pending_lock_requests.destroy();
} }
void locktree::add_reference(void) {
(void) toku_sync_add_and_fetch(&m_reference_count, 1);
}
uint32_t locktree::release_reference(void) {
return toku_sync_sub_and_fetch(&m_reference_count, 1);
}
uint32_t locktree::get_reference_count(void) {
return m_reference_count;
}
// a container for a range/txnid pair // a container for a range/txnid pair
struct row_lock { struct row_lock {
keyrange range; keyrange range;
...@@ -174,9 +185,8 @@ struct row_lock { ...@@ -174,9 +185,8 @@ struct row_lock {
// storing each row lock into the given growable array. the // storing each row lock into the given growable array. the
// caller does not own the range inside the returned row locks, // caller does not own the range inside the returned row locks,
// so remove from the tree with care using them as keys. // so remove from the tree with care using them as keys.
static void iterate_and_get_overlapping_row_locks( static void iterate_and_get_overlapping_row_locks(const concurrent_tree::locked_keyrange *lkr,
const concurrent_tree::locked_keyrange *lkr, GrowableArray<row_lock> *row_locks) {
GrowableArray<row_lock> *row_locks) {
struct copy_fn_obj { struct copy_fn_obj {
GrowableArray<row_lock> *row_locks; GrowableArray<row_lock> *row_locks;
bool fn(const keyrange &range, TXNID txnid) { bool fn(const keyrange &range, TXNID txnid) {
...@@ -193,7 +203,7 @@ static void iterate_and_get_overlapping_row_locks( ...@@ -193,7 +203,7 @@ static void iterate_and_get_overlapping_row_locks(
// which txnids are conflicting, and store them in the conflicts // which txnids are conflicting, and store them in the conflicts
// set, if given. // set, if given.
static bool determine_conflicting_txnids(const GrowableArray<row_lock> &row_locks, static bool determine_conflicting_txnids(const GrowableArray<row_lock> &row_locks,
const TXNID &txnid, txnid_set *conflicts) { const TXNID &txnid, txnid_set *conflicts) {
bool conflicts_exist = false; bool conflicts_exist = false;
const size_t num_overlaps = row_locks.get_size(); const size_t num_overlaps = row_locks.get_size();
for (size_t i = 0; i < num_overlaps; i++) { for (size_t i = 0; i < num_overlaps; i++) {
...@@ -218,19 +228,23 @@ static uint64_t row_lock_size_in_tree(const row_lock &lock) { ...@@ -218,19 +228,23 @@ static uint64_t row_lock_size_in_tree(const row_lock &lock) {
// remove and destroy the given row lock from the locked keyrange, // remove and destroy the given row lock from the locked keyrange,
// then notify the memory tracker of the newly freed lock. // then notify the memory tracker of the newly freed lock.
static void remove_row_lock_from_tree(concurrent_tree::locked_keyrange *lkr, static void remove_row_lock_from_tree(concurrent_tree::locked_keyrange *lkr,
const row_lock &lock, locktree::manager::memory_tracker *mem_tracker) { const row_lock &lock, locktree_manager *mgr) {
const uint64_t mem_released = row_lock_size_in_tree(lock); const uint64_t mem_released = row_lock_size_in_tree(lock);
lkr->remove(lock.range); lkr->remove(lock.range);
mem_tracker->note_mem_released(mem_released); if (mgr != nullptr) {
mgr->note_mem_released(mem_released);
}
} }
// insert a row lock into the locked keyrange, then notify // insert a row lock into the locked keyrange, then notify
// the memory tracker of this newly acquired lock. // the memory tracker of this newly acquired lock.
static void insert_row_lock_into_tree(concurrent_tree::locked_keyrange *lkr, static void insert_row_lock_into_tree(concurrent_tree::locked_keyrange *lkr,
const row_lock &lock, locktree::manager::memory_tracker *mem_tracker) { const row_lock &lock, locktree_manager *mgr) {
uint64_t mem_used = row_lock_size_in_tree(lock); uint64_t mem_used = row_lock_size_in_tree(lock);
lkr->insert(lock.range, lock.txnid); lkr->insert(lock.range, lock.txnid);
mem_tracker->note_mem_used(mem_used); if (mgr != nullptr) {
mgr->note_mem_used(mem_used);
}
} }
void locktree::sto_begin(TXNID txnid) { void locktree::sto_begin(TXNID txnid) {
...@@ -247,12 +261,16 @@ void locktree::sto_append(const DBT *left_key, const DBT *right_key) { ...@@ -247,12 +261,16 @@ void locktree::sto_append(const DBT *left_key, const DBT *right_key) {
buffer_mem = m_sto_buffer.get_num_bytes(); buffer_mem = m_sto_buffer.get_num_bytes();
m_sto_buffer.append(left_key, right_key); m_sto_buffer.append(left_key, right_key);
delta = m_sto_buffer.get_num_bytes() - buffer_mem; delta = m_sto_buffer.get_num_bytes() - buffer_mem;
m_mem_tracker->note_mem_used(delta); if (m_mgr != nullptr) {
m_mgr->note_mem_used(delta);
}
} }
void locktree::sto_end(void) { void locktree::sto_end(void) {
uint64_t num_bytes = m_sto_buffer.get_num_bytes(); uint64_t num_bytes = m_sto_buffer.get_num_bytes();
m_mem_tracker->note_mem_released(num_bytes); if (m_mgr != nullptr) {
m_mgr->note_mem_released(num_bytes);
}
m_sto_buffer.destroy(); m_sto_buffer.destroy();
m_sto_buffer.create(); m_sto_buffer.create();
m_sto_txnid = TXNID_NONE; m_sto_txnid = TXNID_NONE;
...@@ -314,8 +332,9 @@ void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) { ...@@ -314,8 +332,9 @@ void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) {
invariant(!m_rangetree->is_empty()); invariant(!m_rangetree->is_empty());
} }
bool locktree::sto_try_acquire(void *prepared_lkr, TXNID txnid, bool locktree::sto_try_acquire(void *prepared_lkr,
const DBT *left_key, const DBT *right_key) { TXNID txnid,
const DBT *left_key, const DBT *right_key) {
if (m_rangetree->is_empty() && m_sto_buffer.is_empty() && m_sto_score >= STO_SCORE_THRESHOLD) { if (m_rangetree->is_empty() && m_sto_buffer.is_empty() && m_sto_score >= STO_SCORE_THRESHOLD) {
// We can do the optimization because the rangetree is empty, and // We can do the optimization because the rangetree is empty, and
// we know its worth trying because the sto score is big enough. // we know its worth trying because the sto score is big enough.
...@@ -344,8 +363,10 @@ bool locktree::sto_try_acquire(void *prepared_lkr, TXNID txnid, ...@@ -344,8 +363,10 @@ bool locktree::sto_try_acquire(void *prepared_lkr, TXNID txnid,
// try to acquire a lock and consolidate it with existing locks if possible // try to acquire a lock and consolidate it with existing locks if possible
// param: lkr, a prepared locked keyrange // param: lkr, a prepared locked keyrange
// return: 0 on success, DB_LOCK_NOTGRANTED if conflicting locks exist. // return: 0 on success, DB_LOCK_NOTGRANTED if conflicting locks exist.
int locktree::acquire_lock_consolidated(void *prepared_lkr, TXNID txnid, int locktree::acquire_lock_consolidated(void *prepared_lkr,
const DBT *left_key, const DBT *right_key, txnid_set *conflicts) { TXNID txnid,
const DBT *left_key, const DBT *right_key,
txnid_set *conflicts) {
int r = 0; int r = 0;
concurrent_tree::locked_keyrange *lkr; concurrent_tree::locked_keyrange *lkr;
...@@ -361,8 +382,8 @@ int locktree::acquire_lock_consolidated(void *prepared_lkr, TXNID txnid, ...@@ -361,8 +382,8 @@ int locktree::acquire_lock_consolidated(void *prepared_lkr, TXNID txnid,
size_t num_overlapping_row_locks = overlapping_row_locks.get_size(); size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
// if any overlapping row locks conflict with this request, bail out. // if any overlapping row locks conflict with this request, bail out.
bool conflicts_exist = determine_conflicting_txnids( bool conflicts_exist = determine_conflicting_txnids(overlapping_row_locks,
overlapping_row_locks, txnid, conflicts); txnid, conflicts);
if (!conflicts_exist) { if (!conflicts_exist) {
// there are no conflicts, so all of the overlaps are for the requesting txnid. // there are no conflicts, so all of the overlaps are for the requesting txnid.
// so, we must consolidate all existing overlapping ranges and the requested // so, we must consolidate all existing overlapping ranges and the requested
...@@ -371,11 +392,11 @@ int locktree::acquire_lock_consolidated(void *prepared_lkr, TXNID txnid, ...@@ -371,11 +392,11 @@ int locktree::acquire_lock_consolidated(void *prepared_lkr, TXNID txnid,
row_lock overlapping_lock = overlapping_row_locks.fetch_unchecked(i); row_lock overlapping_lock = overlapping_row_locks.fetch_unchecked(i);
invariant(overlapping_lock.txnid == txnid); invariant(overlapping_lock.txnid == txnid);
requested_range.extend(m_cmp, overlapping_lock.range); requested_range.extend(m_cmp, overlapping_lock.range);
remove_row_lock_from_tree(lkr, overlapping_lock, m_mem_tracker); remove_row_lock_from_tree(lkr, overlapping_lock, m_mgr);
} }
row_lock new_lock = { .range = requested_range, .txnid = txnid }; row_lock new_lock = { .range = requested_range, .txnid = txnid };
insert_row_lock_into_tree(lkr, new_lock, m_mem_tracker); insert_row_lock_into_tree(lkr, new_lock, m_mgr);
} else { } else {
r = DB_LOCK_NOTGRANTED; r = DB_LOCK_NOTGRANTED;
} }
...@@ -388,8 +409,10 @@ int locktree::acquire_lock_consolidated(void *prepared_lkr, TXNID txnid, ...@@ -388,8 +409,10 @@ int locktree::acquire_lock_consolidated(void *prepared_lkr, TXNID txnid,
// acquire a lock in the given key range, inclusive. if successful, // acquire a lock in the given key range, inclusive. if successful,
// return 0. otherwise, populate the conflicts txnid_set with the set of // return 0. otherwise, populate the conflicts txnid_set with the set of
// transactions that conflict with this request. // transactions that conflict with this request.
int locktree::acquire_lock(bool is_write_request, TXNID txnid, int locktree::acquire_lock(bool is_write_request,
const DBT *left_key, const DBT *right_key, txnid_set *conflicts) { TXNID txnid,
const DBT *left_key, const DBT *right_key,
txnid_set *conflicts) {
int r = 0; int r = 0;
// we are only supporting write locks for simplicity // we are only supporting write locks for simplicity
...@@ -410,12 +433,15 @@ int locktree::acquire_lock(bool is_write_request, TXNID txnid, ...@@ -410,12 +433,15 @@ int locktree::acquire_lock(bool is_write_request, TXNID txnid,
return r; return r;
} }
int locktree::try_acquire_lock(bool is_write_request, TXNID txnid, int locktree::try_acquire_lock(bool is_write_request,
const DBT *left_key, const DBT *right_key, txnid_set *conflicts, bool big_txn) { TXNID txnid,
const DBT *left_key, const DBT *right_key,
txnid_set *conflicts, bool big_txn) {
// All ranges in the locktree must have left endpoints <= right endpoints. // All ranges in the locktree must have left endpoints <= right endpoints.
// Range comparisons rely on this fact, so we make a paranoid invariant here. // Range comparisons rely on this fact, so we make a paranoid invariant here.
paranoid_invariant(m_cmp->compare(left_key, right_key) <= 0); paranoid_invariant(m_cmp->compare(left_key, right_key) <= 0);
int r = m_mgr->check_current_lock_constraints(big_txn); int r = m_mgr == nullptr ? 0 :
m_mgr->check_current_lock_constraints(big_txn);
if (r == 0) { if (r == 0) {
r = acquire_lock(is_write_request, txnid, left_key, right_key, conflicts); r = acquire_lock(is_write_request, txnid, left_key, right_key, conflicts);
} }
...@@ -423,18 +449,19 @@ int locktree::try_acquire_lock(bool is_write_request, TXNID txnid, ...@@ -423,18 +449,19 @@ int locktree::try_acquire_lock(bool is_write_request, TXNID txnid,
} }
// the locktree silently upgrades read locks to write locks for simplicity // the locktree silently upgrades read locks to write locks for simplicity
int locktree::acquire_read_lock(TXNID txnid, int locktree::acquire_read_lock(TXNID txnid, const DBT *left_key, const DBT *right_key,
const DBT *left_key, const DBT *right_key, txnid_set *conflicts, bool big_txn) { txnid_set *conflicts, bool big_txn) {
return acquire_write_lock(txnid, left_key, right_key, conflicts, big_txn); return acquire_write_lock(txnid, left_key, right_key, conflicts, big_txn);
} }
int locktree::acquire_write_lock(TXNID txnid, int locktree::acquire_write_lock(TXNID txnid, const DBT *left_key, const DBT *right_key,
const DBT *left_key, const DBT *right_key, txnid_set *conflicts, bool big_txn) { txnid_set *conflicts, bool big_txn) {
return try_acquire_lock(true, txnid, left_key, right_key, conflicts, big_txn); return try_acquire_lock(true, txnid, left_key, right_key, conflicts, big_txn);
} }
void locktree::get_conflicts(bool is_write_request, TXNID txnid, void locktree::get_conflicts(bool is_write_request,
const DBT *left_key, const DBT *right_key, txnid_set *conflicts) { TXNID txnid, const DBT *left_key, const DBT *right_key,
txnid_set *conflicts) {
// because we only support write locks, ignore this bit for now. // because we only support write locks, ignore this bit for now.
(void) is_write_request; (void) is_write_request;
...@@ -483,8 +510,8 @@ void locktree::get_conflicts(bool is_write_request, TXNID txnid, ...@@ -483,8 +510,8 @@ void locktree::get_conflicts(bool is_write_request, TXNID txnid,
// whole lock [1,3]. Now, someone else can lock 2 before our txn gets // whole lock [1,3]. Now, someone else can lock 2 before our txn gets
// around to unlocking 2, so we should not remove that lock. // around to unlocking 2, so we should not remove that lock.
void locktree::remove_overlapping_locks_for_txnid(TXNID txnid, void locktree::remove_overlapping_locks_for_txnid(TXNID txnid,
const DBT *left_key, const DBT *right_key) { const DBT *left_key,
const DBT *right_key) {
keyrange release_range; keyrange release_range;
release_range.create(left_key, right_key); release_range.create(left_key, right_key);
...@@ -504,7 +531,7 @@ void locktree::remove_overlapping_locks_for_txnid(TXNID txnid, ...@@ -504,7 +531,7 @@ void locktree::remove_overlapping_locks_for_txnid(TXNID txnid,
// If this isn't our lock, that's ok, just don't remove it. // If this isn't our lock, that's ok, just don't remove it.
// See rationale above. // See rationale above.
if (lock.txnid == txnid) { if (lock.txnid == txnid) {
remove_row_lock_from_tree(&lkr, lock, m_mem_tracker); remove_row_lock_from_tree(&lkr, lock, m_mgr);
} }
} }
...@@ -574,8 +601,8 @@ void locktree::release_locks(TXNID txnid, const range_buffer *ranges) { ...@@ -574,8 +601,8 @@ void locktree::release_locks(TXNID txnid, const range_buffer *ranges) {
// row locks, storing each one into the given array of size N, // row locks, storing each one into the given array of size N,
// then removing each extracted lock from the locked keyrange. // then removing each extracted lock from the locked keyrange.
static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr, static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr,
locktree::manager::memory_tracker *mem_tracker, locktree_manager *mgr,
row_lock *row_locks, int num_to_extract) { row_lock *row_locks, int num_to_extract) {
struct extract_fn_obj { struct extract_fn_obj {
int num_extracted; int num_extracted;
...@@ -606,7 +633,7 @@ static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr, ...@@ -606,7 +633,7 @@ static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr,
int num_extracted = extract_fn.num_extracted; int num_extracted = extract_fn.num_extracted;
invariant(num_extracted <= num_to_extract); invariant(num_extracted <= num_to_extract);
for (int i = 0; i < num_extracted; i++) { for (int i = 0; i < num_extracted; i++) {
remove_row_lock_from_tree(lkr, row_locks[i], mem_tracker); remove_row_lock_from_tree(lkr, row_locks[i], mgr);
} }
return num_extracted; return num_extracted;
...@@ -638,7 +665,7 @@ struct txnid_range_buffer { ...@@ -638,7 +665,7 @@ struct txnid_range_buffer {
// approach works well. if there are many txnids and each // approach works well. if there are many txnids and each
// has locks in a random/alternating order, then this does // has locks in a random/alternating order, then this does
// not work so well. // not work so well.
void locktree::escalate(manager::lt_escalate_cb after_escalate_callback, void *after_escalate_callback_extra) { void locktree::escalate(lt_escalate_cb after_escalate_callback, void *after_escalate_callback_extra) {
omt<struct txnid_range_buffer, struct txnid_range_buffer *> range_buffers; omt<struct txnid_range_buffer, struct txnid_range_buffer *> range_buffers;
range_buffers.create(); range_buffers.create();
...@@ -664,8 +691,9 @@ void locktree::escalate(manager::lt_escalate_cb after_escalate_callback, void *a ...@@ -664,8 +691,9 @@ void locktree::escalate(manager::lt_escalate_cb after_escalate_callback, void *a
// we always remove the "first" n because we are removing n // we always remove the "first" n because we are removing n
// each time we do an extraction. so this loops until its empty. // each time we do an extraction. so this loops until its empty.
while ((num_extracted = extract_first_n_row_locks(&lkr, m_mem_tracker, while ((num_extracted =
extracted_buf, num_row_locks_per_batch)) > 0) { extract_first_n_row_locks(&lkr, m_mgr, extracted_buf,
num_row_locks_per_batch)) > 0) {
int current_index = 0; int current_index = 0;
while (current_index < num_extracted) { while (current_index < num_extracted) {
// every batch of extracted locks is in range-sorted order. search // every batch of extracted locks is in range-sorted order. search
...@@ -733,7 +761,7 @@ void locktree::escalate(manager::lt_escalate_cb after_escalate_callback, void *a ...@@ -733,7 +761,7 @@ void locktree::escalate(manager::lt_escalate_cb after_escalate_callback, void *a
keyrange range; keyrange range;
range.create(rec.get_left_key(), rec.get_right_key()); range.create(rec.get_left_key(), rec.get_right_key());
row_lock lock = { .range = range, .txnid = current_txnid }; row_lock lock = { .range = range, .txnid = current_txnid };
insert_row_lock_into_tree(&lkr, lock, m_mem_tracker); insert_row_lock_into_tree(&lkr, lock, m_mgr);
iter.next(); iter.next();
} }
...@@ -748,7 +776,7 @@ void locktree::escalate(manager::lt_escalate_cb after_escalate_callback, void *a ...@@ -748,7 +776,7 @@ void locktree::escalate(manager::lt_escalate_cb after_escalate_callback, void *a
lkr.release(); lkr.release();
} }
void *locktree::get_userdata(void) { void *locktree::get_userdata(void) const {
return m_userdata; return m_userdata;
} }
...@@ -756,7 +784,7 @@ void locktree::set_userdata(void *userdata) { ...@@ -756,7 +784,7 @@ void locktree::set_userdata(void *userdata) {
m_userdata = userdata; m_userdata = userdata;
} }
struct locktree::lt_lock_request_info *locktree::get_lock_request_info(void) { struct lt_lock_request_info *locktree::get_lock_request_info(void) {
return &m_lock_request_info; return &m_lock_request_info;
} }
...@@ -764,11 +792,11 @@ void locktree::set_descriptor(DESCRIPTOR desc) { ...@@ -764,11 +792,11 @@ void locktree::set_descriptor(DESCRIPTOR desc) {
m_cmp->set_descriptor(desc); m_cmp->set_descriptor(desc);
} }
locktree::manager::memory_tracker *locktree::get_mem_tracker(void) const { locktree_manager *locktree::get_manager(void) const {
return m_mem_tracker; return m_mgr;
} }
int locktree::compare(const locktree *lt) { int locktree::compare(const locktree *lt) const {
if (m_dict_id.dictid < lt->m_dict_id.dictid) { if (m_dict_id.dictid < lt->m_dict_id.dictid) {
return -1; return -1;
} else if (m_dict_id.dictid == lt->m_dict_id.dictid) { } else if (m_dict_id.dictid == lt->m_dict_id.dictid) {
......
...@@ -89,8 +89,7 @@ PATENT RIGHTS GRANT: ...@@ -89,8 +89,7 @@ PATENT RIGHTS GRANT:
#ident "Copyright (c) 2007-2013 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007-2013 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#ifndef TOKU_LOCKTREE_H #pragma once
#define TOKU_LOCKTREE_H
#include <db.h> #include <db.h>
#include <toku_time.h> #include <toku_time.h>
...@@ -135,70 +134,31 @@ typedef struct { ...@@ -135,70 +134,31 @@ typedef struct {
namespace toku { namespace toku {
class lock_request; class locktree;
class concurrent_tree; class locktree_manager;
class lock_request;
// A locktree represents the set of row locks owned by all transactions class memory_tracker;
// over an open dictionary. Read and write ranges are represented as class concurrent_tree;
// a left and right key which are compared with the given descriptor
// and comparison fn. typedef int (*lt_create_cb)(locktree *lt, void *extra);
// typedef void (*lt_destroy_cb)(locktree *lt);
// Locktrees are not created and destroyed by the user. Instead, they are typedef void (*lt_escalate_cb)(TXNID txnid, const locktree *lt, const range_buffer &buffer, void *extra);
// referenced and released using the locktree manager.
//
// A sample workflow looks like this:
// - Create a manager.
// - Get a locktree by dictionaroy id from the manager.
// - Perform read/write lock acquision on the locktree, add references to
// the locktree using the manager, release locks, release references, etc.
// - ...
// - Release the final reference to the locktree. It will be destroyed.
// - Destroy the manager.
class locktree {
public:
// effect: Attempts to grant a read lock for the range of keys between [left_key, right_key].
// returns: If the lock cannot be granted, return DB_LOCK_NOTGRANTED, and populate the
// given conflicts set with the txnids that hold conflicting locks in the range.
// If the locktree cannot create more locks, return TOKUDB_OUT_OF_LOCKS.
// note: Read locks cannot be shared between txnids, as one would expect.
// This is for simplicity since read locks are rare in MySQL.
int acquire_read_lock(TXNID txnid, const DBT *left_key, const DBT *right_key, txnid_set *conflicts, bool big_txn);
// effect: Attempts to grant a write lock for the range of keys between [left_key, right_key].
// returns: If the lock cannot be granted, return DB_LOCK_NOTGRANTED, and populate the
// given conflicts set with the txnids that hold conflicting locks in the range.
// If the locktree cannot create more locks, return TOKUDB_OUT_OF_LOCKS.
int acquire_write_lock(TXNID txnid, const DBT *left_key, const DBT *right_key, txnid_set *conflicts, bool big_txn);
// effect: populate the conflicts set with the txnids that would preventing
// the given txnid from getting a lock on [left_key, right_key]
void get_conflicts(bool is_write_request, TXNID txnid,
const DBT *left_key, const DBT *right_key, txnid_set *conflicts);
// effect: Release all of the lock ranges represented by the range buffer for a txnid.
void release_locks(TXNID txnid, const range_buffer *ranges);
// returns: The userdata associated with this locktree, or null if it has not been set.
void *get_userdata(void);
void set_userdata(void *userdata);
void set_descriptor(DESCRIPTOR desc);
int compare(const locktree *lt);
DICTIONARY_ID get_dict_id() const;
struct lt_counters { struct lt_counters {
uint64_t wait_count, wait_time; uint64_t wait_count, wait_time;
uint64_t long_wait_count, long_wait_time; uint64_t long_wait_count, long_wait_time;
uint64_t timeout_count; uint64_t timeout_count;
void add(const lt_counters &rhs) {
wait_count += rhs.wait_count;
wait_time += rhs.wait_time;
long_wait_count += rhs.long_wait_count;
long_wait_time += rhs.long_wait_time;
timeout_count += rhs.timeout_count;
}
}; };
// The locktree stores some data for lock requests. It doesn't have to know // Lock request state for some locktree
// how they work or even what a lock request object looks like.
struct lt_lock_request_info { struct lt_lock_request_info {
omt<lock_request *> pending_lock_requests; omt<lock_request *> pending_lock_requests;
toku_mutex_t mutex; toku_mutex_t mutex;
...@@ -206,42 +166,15 @@ public: ...@@ -206,42 +166,15 @@ public:
lt_counters counters; lt_counters counters;
}; };
// Private info struct for storing pending lock request state. // The locktree manager manages a set of locktrees, one for each open dictionary.
// Only to be used by lock requests. We store it here as // Locktrees are retrieved from the manager. When they are no longer needed, they
// something less opaque than usual to strike a tradeoff between // are be released by the user.
// abstraction and code complexity. It is still fairly abstract class locktree_manager {
// since the lock_request object is opaque
struct lt_lock_request_info *get_lock_request_info(void);
class manager;
// the escalator coordinates escalation on a set of locktrees for a bunch of threads
class escalator {
public:
void create(void);
void destroy(void);
void run(manager *mgr, void (*escalate_locktrees_fun)(void *extra), void *extra);
private:
toku_mutex_t m_escalator_mutex;
toku_cond_t m_escalator_done;
bool m_escalator_running;
};
ENSURE_POD(escalator);
// The locktree manager manages a set of locktrees,
// one for each open dictionary. Locktrees are accessed through
// the manager, and when they are no longer needed, they can
// be released by the user.
class manager {
public: public:
typedef int (*lt_create_cb)(locktree *lt, void *extra); // param: create_cb, called just after a locktree is first created.
typedef void (*lt_destroy_cb)(locktree *lt); // destroy_cb, called just before a locktree is destroyed.
typedef void (*lt_escalate_cb)(TXNID txnid, const locktree *lt, const range_buffer &buffer, void *extra); // escalate_cb, called after a locktree is escalated (with extra param)
void create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb escalate_cb, void *extra);
// note: create_cb is called just after a locktree is first created.
// destroy_cb is called just before a locktree is destroyed.
void create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb, void *extra);
void destroy(void); void destroy(void);
...@@ -252,10 +185,9 @@ public: ...@@ -252,10 +185,9 @@ public:
// effect: Get a locktree from the manager. If a locktree exists with the given // effect: Get a locktree from the manager. If a locktree exists with the given
// dict_id, it is referenced and then returned. If one did not exist, it // dict_id, it is referenced and then returned. If one did not exist, it
// is created. It will use the given descriptor and comparison function // is created. It will use the given descriptor and comparison function
// for comparing keys, and the on_create callback passed to manager::create() // for comparing keys, and the on_create callback passed to locktree_manager::create()
// will be called with the given extra parameter. // will be called with the given extra parameter.
locktree *get_lt(DICTIONARY_ID dict_id, DESCRIPTOR desc, ft_compare_func cmp, locktree *get_lt(DICTIONARY_ID dict_id, DESCRIPTOR desc, ft_compare_func cmp, void *on_create_extra);
void *on_create_extra);
void reference_lt(locktree *lt); void reference_lt(locktree *lt);
...@@ -263,54 +195,6 @@ public: ...@@ -263,54 +195,6 @@ public:
// to zero, the on_destroy callback is called before it gets destroyed. // to zero, the on_destroy callback is called before it gets destroyed.
void release_lt(locktree *lt); void release_lt(locktree *lt);
// The memory tracker is employed by the manager to take care of
// maintaining the current number of locks and lock memory and run
// escalation if necessary.
//
// To do this, the manager hands out a memory tracker reference to each
// locktree it creates, so that the locktrees can notify the memory
// tracker when locks are acquired and released.
class memory_tracker {
public:
void set_manager(manager *mgr);
manager *get_manager(void);
// effect: Determines if too many locks or too much memory is being used,
// Runs escalation on the manager if so.
// returns: 0 if there enough resources to create a new lock, or TOKUDB_OUT_OF_LOCKS
// if there are not enough resources and lock escalation failed to free up
// enough resources for a new lock.
int check_current_lock_constraints(void);
bool over_big_threshold(void);
void note_mem_used(uint64_t mem_used);
void note_mem_released(uint64_t mem_freed);
private:
manager *m_mgr;
// returns: true if the manager of this memory tracker currently
// has more locks or lock memory than it is allowed.
// note: this is a lock-less read, and it is ok for the caller to
// get false when they should have gotten true as long as
// a subsequent call gives the correct answer.
//
// in general, if the tracker says the manager is not out of
// locks, you are clear to add O(1) locks to the system.
bool out_of_locks(void) const;
};
ENSURE_POD(memory_tracker);
// effect: calls the private function run_escalation(), only ok to
// do for tests.
// rationale: to get better stress test coverage, we want a way to
// deterministicly trigger lock escalation.
void run_escalation_for_test(void);
void run_escalation(void);
void get_status(LTM_STATUS status); void get_status(LTM_STATUS status);
// effect: calls the iterate function on each pending lock request // effect: calls the iterate function on each pending lock request
...@@ -324,10 +208,21 @@ public: ...@@ -324,10 +208,21 @@ public:
void *extra); void *extra);
int iterate_pending_lock_requests(lock_request_iterate_callback cb, void *extra); int iterate_pending_lock_requests(lock_request_iterate_callback cb, void *extra);
// effect: Determines if too many locks or too much memory is being used,
// Runs escalation on the manager if so.
// param: big_txn, if the current transaction is 'big' (has spilled rollback logs)
// returns: 0 if there enough resources to create a new lock, or TOKUDB_OUT_OF_LOCKS
// if there are not enough resources and lock escalation failed to free up
// enough resources for a new lock.
int check_current_lock_constraints(bool big_txn); int check_current_lock_constraints(bool big_txn);
// Escalate locktrees touched by a txn bool over_big_threshold(void);
void escalate_lock_trees_for_txn(TXNID, locktree *lt);
void note_mem_used(uint64_t mem_used);
void note_mem_released(uint64_t mem_freed);
bool out_of_locks(void) const;
// Escalate all locktrees // Escalate all locktrees
void escalate_all_locktrees(void); void escalate_all_locktrees(void);
...@@ -335,6 +230,13 @@ public: ...@@ -335,6 +230,13 @@ public:
// Escalate a set of locktrees // Escalate a set of locktrees
void escalate_locktrees(locktree **locktrees, int num_locktrees); void escalate_locktrees(locktree **locktrees, int num_locktrees);
// effect: calls the private function run_escalation(), only ok to
// do for tests.
// rationale: to get better stress test coverage, we want a way to
// deterministicly trigger lock escalation.
void run_escalation_for_test(void);
void run_escalation(void);
// Add time t to the escalator's wait time statistics // Add time t to the escalator's wait time statistics
void add_escalator_wait_time(uint64_t t); void add_escalator_wait_time(uint64_t t);
...@@ -344,7 +246,7 @@ public: ...@@ -344,7 +246,7 @@ public:
// tracks the current number of locks and lock memory // tracks the current number of locks and lock memory
uint64_t m_max_lock_memory; uint64_t m_max_lock_memory;
uint64_t m_current_lock_memory; uint64_t m_current_lock_memory;
memory_tracker m_mem_tracker; memory_tracker *m_mem_tracker;
struct lt_counters m_lt_counters; struct lt_counters m_lt_counters;
...@@ -367,22 +269,14 @@ public: ...@@ -367,22 +269,14 @@ public:
void status_init(void); void status_init(void);
// effect: Gets a locktree from the map. // Manage the set of open locktrees
// requires: Manager's mutex is held
locktree *locktree_map_find(const DICTIONARY_ID &dict_id); locktree *locktree_map_find(const DICTIONARY_ID &dict_id);
// effect: Puts a locktree into the map.
// requires: Manager's mutex is held
void locktree_map_put(locktree *lt); void locktree_map_put(locktree *lt);
// effect: Removes a locktree from the map.
// requires: Manager's mutex is held
void locktree_map_remove(locktree *lt); void locktree_map_remove(locktree *lt);
static int find_by_dict_id(locktree *const &lt, const DICTIONARY_ID &dict_id); static int find_by_dict_id(locktree *const &lt, const DICTIONARY_ID &dict_id);
void escalator_init(void); void escalator_init(void);
void escalator_destroy(void); void escalator_destroy(void);
// statistics about lock escalation. // statistics about lock escalation.
...@@ -395,214 +289,295 @@ public: ...@@ -395,214 +289,295 @@ public:
uint64_t m_long_wait_escalation_count; uint64_t m_long_wait_escalation_count;
uint64_t m_long_wait_escalation_time; uint64_t m_long_wait_escalation_time;
escalator m_escalator; // the escalator coordinates escalation on a set of locktrees for a bunch of threads
class locktree_escalator {
public:
void create(void);
void destroy(void);
void run(locktree_manager *mgr, void (*escalate_locktrees_fun)(void *extra), void *extra);
private:
toku_mutex_t m_escalator_mutex;
toku_cond_t m_escalator_done;
bool m_escalator_running;
};
locktree_escalator m_escalator;
friend class manager_unit_test; friend class manager_unit_test;
}; };
ENSURE_POD(manager);
manager::memory_tracker *get_mem_tracker(void) const; // A locktree represents the set of row locks owned by all transactions
// over an open dictionary. Read and write ranges are represented as
// a left and right key which are compared with the given descriptor
// and comparison fn.
//
// Locktrees are not created and destroyed by the user. Instead, they are
// referenced and released using the locktree manager.
//
// A sample workflow looks like this:
// - Create a manager.
// - Get a locktree by dictionaroy id from the manager.
// - Perform read/write lock acquision on the locktree, add references to
// the locktree using the manager, release locks, release references, etc.
// - ...
// - Release the final reference to the locktree. It will be destroyed.
// - Destroy the manager.
class locktree {
public:
// effect: Creates a locktree that uses the given memory tracker
// to report memory usage and honor memory constraints.
void create(locktree_manager *mgr, DICTIONARY_ID dict_id,
DESCRIPTOR desc, ft_compare_func cmp);
private: void destroy(void);
manager *m_mgr;
manager::memory_tracker *m_mem_tracker;
DICTIONARY_ID m_dict_id; // For thread-safe, external reference counting
void add_reference(void);
// use a comparator object that encapsulates an ft compare // requires: the reference count is > 0
// function and a descriptor in a fake db. this way we can // returns: the reference count, after decrementing it by one
// pass it around for easy key comparisons. uint32_t release_reference(void);
//
// since this comparator will store a pointer to a descriptor,
// the user of the locktree needs to make sure that the descriptor
// is valid for as long as the locktree. this is currently
// implemented by opening an ft_handle for this locktree and
// storing it as userdata below.
comparator *m_cmp;
uint32_t m_reference_count; // returns: the current reference count
uint32_t get_reference_count(void);
concurrent_tree *m_rangetree; // effect: Attempts to grant a read lock for the range of keys between [left_key, right_key].
// returns: If the lock cannot be granted, return DB_LOCK_NOTGRANTED, and populate the
// given conflicts set with the txnids that hold conflicting locks in the range.
// If the locktree cannot create more locks, return TOKUDB_OUT_OF_LOCKS.
// note: Read locks cannot be shared between txnids, as one would expect.
// This is for simplicity since read locks are rare in MySQL.
int acquire_read_lock(TXNID txnid, const DBT *left_key, const DBT *right_key, txnid_set *conflicts, bool big_txn);
void *m_userdata; // effect: Attempts to grant a write lock for the range of keys between [left_key, right_key].
// returns: If the lock cannot be granted, return DB_LOCK_NOTGRANTED, and populate the
// given conflicts set with the txnids that hold conflicting locks in the range.
// If the locktree cannot create more locks, return TOKUDB_OUT_OF_LOCKS.
int acquire_write_lock(TXNID txnid, const DBT *left_key, const DBT *right_key, txnid_set *conflicts, bool big_txn);
struct lt_lock_request_info m_lock_request_info; // effect: populate the conflicts set with the txnids that would preventing
// the given txnid from getting a lock on [left_key, right_key]
void get_conflicts(bool is_write_request, TXNID txnid,
const DBT *left_key, const DBT *right_key, txnid_set *conflicts);
// The following fields and members prefixed with "sto_" are for // effect: Release all of the lock ranges represented by the range buffer for a txnid.
// the single txnid optimization, intended to speed up the case void release_locks(TXNID txnid, const range_buffer *ranges);
// when only one transaction is using the locktree. If we know
// the locktree has only one transaction, then acquiring locks
// takes O(1) work and releasing all locks takes O(1) work.
//
// How do we know that the locktree only has a single txnid?
// What do we do if it does?
//
// When a txn with txnid T requests a lock:
// - If the tree is empty, the optimization is possible. Set the single
// txnid to T, and insert the lock range into the buffer.
// - If the tree is not empty, check if the single txnid is T. If so,
// append the lock range to the buffer. Otherwise, migrate all of
// the locks in the buffer into the rangetree on behalf of txnid T,
// and invalid the single txnid.
//
// When a txn with txnid T releases its locks:
// - If the single txnid is valid, it must be for T. Destroy the buffer.
// - If it's not valid, release locks the normal way in the rangetree.
//
// To carry out the optimization we need to record a single txnid
// and a range buffer for each locktree, each protected by the root
// lock of the locktree's rangetree. The root lock for a rangetree
// is grabbed by preparing a locked keyrange on the rangetree.
TXNID m_sto_txnid;
range_buffer m_sto_buffer;
// The single txnid optimization speeds up the case when only one
// transaction is using the locktree. But it has the potential to
// hurt the case when more than one txnid exists.
//
// There are two things we need to do to make the optimization only
// optimize the case we care about, and not hurt the general case.
//
// Bound the worst-case latency for lock migration when the
// optimization stops working:
// - Idea: Stop the optimization and migrate immediate if we notice
// the single txnid has takes many locks in the range buffer.
// - Implementation: Enforce a max size on the single txnid range buffer.
// - Analysis: Choosing the perfect max value, M, is difficult to do
// without some feedback from the field. Intuition tells us that M should
// not be so small that the optimization is worthless, and it should not
// be so big that it's unreasonable to have to wait behind a thread doing
// the work of converting M buffer locks into rangetree locks.
//
// Prevent concurrent-transaction workloads from trying the optimization
// in vain:
// - Idea: Don't even bother trying the optimization if we think the
// system is in a concurrent-transaction state.
// - Implementation: Do something even simpler than detecting whether the
// system is in a concurent-transaction state. Just keep a "score" value
// and some threshold. If at any time the locktree is eligible for the
// optimization, only do it if the score is at this threshold. When you
// actually do the optimization but someone has to migrate locks in the buffer
// (expensive), then reset the score back to zero. Each time a txn
// releases locks, the score is incremented by 1.
// - Analysis: If you let the threshold be "C", then at most 1 / C txns will
// do the optimization in a concurrent-transaction system. Similarly, it
// takes at most C txns to start using the single txnid optimzation, which
// is good when the system transitions from multithreaded to single threaded.
//
// STO_BUFFER_MAX_SIZE:
//
// We choose the max value to be 1 million since most transactions are smaller
// than 1 million and we can create a rangetree of 1 million elements in
// less than a second. So we can be pretty confident that this threshold
// enables the optimization almost always, and prevents super pathological
// latency issues for the first lock taken by a second thread.
//
// STO_SCORE_THRESHOLD:
//
// A simple first guess at a good value for the score threshold is 100.
// By our analysis, we'd end up doing the optimization in vain for
// around 1% of all transactions, which seems reasonable. Further,
// if the system goes single threaded, it ought to be pretty quick
// for 100 transactions to go by, so we won't have to wait long before
// we start doing the single txind optimzation again.
static const int STO_BUFFER_MAX_SIZE = 50 * 1024;
static const int STO_SCORE_THRESHOLD = 100;
int m_sto_score;
// statistics about time spent ending the STO early
uint64_t m_sto_end_early_count;
tokutime_t m_sto_end_early_time;
// effect: begins the single txnid optimizaiton, setting m_sto_txnid
// to the given txnid.
// requires: m_sto_txnid is invalid
void sto_begin(TXNID txnid);
// effect: append a range to the sto buffer
// requires: m_sto_txnid is valid
void sto_append(const DBT *left_key, const DBT *right_key);
// effect: ends the single txnid optimization, releaseing any memory
// stored in the sto buffer, notifying the tracker, and
// invalidating m_sto_txnid.
// requires: m_sto_txnid is valid
void sto_end(void);
// params: prepared_lkr is a void * to a prepared locked keyrange. see below.
// effect: ends the single txnid optimization early, migrating buffer locks
// into the rangetree, calling sto_end(), and then setting the
// sto_score back to zero.
// requires: m_sto_txnid is valid
void sto_end_early(void *prepared_lkr);
void sto_end_early_no_accounting(void *prepared_lkr);
// params: prepared_lkr is a void * to a prepared locked keyrange. we can't use
// the real type because the compiler won't allow us to forward declare
// concurrent_tree::locked_keyrange without including concurrent_tree.h,
// which we cannot do here because it is a template implementation.
// requires: the prepared locked keyrange is for the locktree's rangetree
// requires: m_sto_txnid is valid
// effect: migrates each lock in the single txnid buffer into the locktree's
// rangetree, notifying the memory tracker as necessary.
void sto_migrate_buffer_ranges_to_tree(void *prepared_lkr);
// effect: If m_sto_txnid is valid, then release the txnid's locks
// by ending the optimization.
// requires: If m_sto_txnid is valid, it is equal to the given txnid
// returns: True if locks were released for this txnid
bool sto_try_release(TXNID txnid);
// params: prepared_lkr is a void * to a prepared locked keyrange. see above.
// requires: the prepared locked keyrange is for the locktree's rangetree
// effect: If m_sto_txnid is valid and equal to the given txnid, then
// append a range onto the buffer. Otherwise, if m_sto_txnid is valid
// but not equal to this txnid, then migrate the buffer's locks
// into the rangetree and end the optimization, setting the score
// back to zero.
// returns: true if the lock was acquired for this txnid
bool sto_try_acquire(void *prepared_lkr, TXNID txnid,
const DBT *left_key, const DBT *right_key);
// Effect:
// Provides a hook for a helgrind suppression.
// Returns:
// true if m_sto_txnid is not TXNID_NONE
bool sto_txnid_is_valid_unsafe(void) const;
// Effect:
// Provides a hook for a helgrind suppression.
// Returns:
// m_sto_score
int sto_get_score_unsafe(void )const;
// effect: Creates a locktree that uses the given memory tracker
// to report memory usage and honor memory constraints.
void create(manager::memory_tracker *mem_tracker, DICTIONARY_ID dict_id,
DESCRIPTOR desc, ft_compare_func cmp);
void destroy(void);
void remove_overlapping_locks_for_txnid(TXNID txnid,
const DBT *left_key, const DBT *right_key);
int acquire_lock_consolidated(void *prepared_lkr, TXNID txnid,
const DBT *left_key, const DBT *right_key, txnid_set *conflicts);
int acquire_lock(bool is_write_request, TXNID txnid,
const DBT *left_key, const DBT *right_key, txnid_set *conflicts);
int try_acquire_lock(bool is_write_request, TXNID txnid,
const DBT *left_key, const DBT *right_key, txnid_set *conflicts, bool big_txn);
void escalate(manager::lt_escalate_cb after_escalate_callback, void *extra);
friend class locktree_unit_test;
friend class manager_unit_test;
friend class lock_request_unit_test;
};
ENSURE_POD(locktree);
} /* namespace toku */ // effect: Runs escalation on this locktree
void escalate(lt_escalate_cb after_escalate_callback, void *extra);
// returns: The userdata associated with this locktree, or null if it has not been set.
void *get_userdata(void) const;
void set_userdata(void *userdata);
locktree_manager *get_manager(void) const;
void set_descriptor(DESCRIPTOR desc);
#endif /* TOKU_LOCKTREE_H */ int compare(const locktree *lt) const;
DICTIONARY_ID get_dict_id() const;
// Private info struct for storing pending lock request state.
// Only to be used by lock requests. We store it here as
// something less opaque than usual to strike a tradeoff between
// abstraction and code complexity. It is still fairly abstract
// since the lock_request object is opaque
struct lt_lock_request_info *get_lock_request_info(void);
private:
locktree_manager *m_mgr;
DICTIONARY_ID m_dict_id;
uint32_t m_reference_count;
// use a comparator object that encapsulates an ft compare
// function and a descriptor in a fake db. this way we can
// pass it around for easy key comparisons.
//
// since this comparator will store a pointer to a descriptor,
// the user of the locktree needs to make sure that the descriptor
// is valid for as long as the locktree. this is currently
// implemented by opening an ft_handle for this locktree and
// storing it as userdata below.
comparator *m_cmp;
concurrent_tree *m_rangetree;
void *m_userdata;
struct lt_lock_request_info m_lock_request_info;
// The following fields and members prefixed with "sto_" are for
// the single txnid optimization, intended to speed up the case
// when only one transaction is using the locktree. If we know
// the locktree has only one transaction, then acquiring locks
// takes O(1) work and releasing all locks takes O(1) work.
//
// How do we know that the locktree only has a single txnid?
// What do we do if it does?
//
// When a txn with txnid T requests a lock:
// - If the tree is empty, the optimization is possible. Set the single
// txnid to T, and insert the lock range into the buffer.
// - If the tree is not empty, check if the single txnid is T. If so,
// append the lock range to the buffer. Otherwise, migrate all of
// the locks in the buffer into the rangetree on behalf of txnid T,
// and invalid the single txnid.
//
// When a txn with txnid T releases its locks:
// - If the single txnid is valid, it must be for T. Destroy the buffer.
// - If it's not valid, release locks the normal way in the rangetree.
//
// To carry out the optimization we need to record a single txnid
// and a range buffer for each locktree, each protected by the root
// lock of the locktree's rangetree. The root lock for a rangetree
// is grabbed by preparing a locked keyrange on the rangetree.
TXNID m_sto_txnid;
range_buffer m_sto_buffer;
// The single txnid optimization speeds up the case when only one
// transaction is using the locktree. But it has the potential to
// hurt the case when more than one txnid exists.
//
// There are two things we need to do to make the optimization only
// optimize the case we care about, and not hurt the general case.
//
// Bound the worst-case latency for lock migration when the
// optimization stops working:
// - Idea: Stop the optimization and migrate immediate if we notice
// the single txnid has takes many locks in the range buffer.
// - Implementation: Enforce a max size on the single txnid range buffer.
// - Analysis: Choosing the perfect max value, M, is difficult to do
// without some feedback from the field. Intuition tells us that M should
// not be so small that the optimization is worthless, and it should not
// be so big that it's unreasonable to have to wait behind a thread doing
// the work of converting M buffer locks into rangetree locks.
//
// Prevent concurrent-transaction workloads from trying the optimization
// in vain:
// - Idea: Don't even bother trying the optimization if we think the
// system is in a concurrent-transaction state.
// - Implementation: Do something even simpler than detecting whether the
// system is in a concurent-transaction state. Just keep a "score" value
// and some threshold. If at any time the locktree is eligible for the
// optimization, only do it if the score is at this threshold. When you
// actually do the optimization but someone has to migrate locks in the buffer
// (expensive), then reset the score back to zero. Each time a txn
// releases locks, the score is incremented by 1.
// - Analysis: If you let the threshold be "C", then at most 1 / C txns will
// do the optimization in a concurrent-transaction system. Similarly, it
// takes at most C txns to start using the single txnid optimzation, which
// is good when the system transitions from multithreaded to single threaded.
//
// STO_BUFFER_MAX_SIZE:
//
// We choose the max value to be 1 million since most transactions are smaller
// than 1 million and we can create a rangetree of 1 million elements in
// less than a second. So we can be pretty confident that this threshold
// enables the optimization almost always, and prevents super pathological
// latency issues for the first lock taken by a second thread.
//
// STO_SCORE_THRESHOLD:
//
// A simple first guess at a good value for the score threshold is 100.
// By our analysis, we'd end up doing the optimization in vain for
// around 1% of all transactions, which seems reasonable. Further,
// if the system goes single threaded, it ought to be pretty quick
// for 100 transactions to go by, so we won't have to wait long before
// we start doing the single txind optimzation again.
static const int STO_BUFFER_MAX_SIZE = 50 * 1024;
static const int STO_SCORE_THRESHOLD = 100;
int m_sto_score;
// statistics about time spent ending the STO early
uint64_t m_sto_end_early_count;
tokutime_t m_sto_end_early_time;
// effect: begins the single txnid optimizaiton, setting m_sto_txnid
// to the given txnid.
// requires: m_sto_txnid is invalid
void sto_begin(TXNID txnid);
// effect: append a range to the sto buffer
// requires: m_sto_txnid is valid
void sto_append(const DBT *left_key, const DBT *right_key);
// effect: ends the single txnid optimization, releaseing any memory
// stored in the sto buffer, notifying the tracker, and
// invalidating m_sto_txnid.
// requires: m_sto_txnid is valid
void sto_end(void);
// params: prepared_lkr is a void * to a prepared locked keyrange. see below.
// effect: ends the single txnid optimization early, migrating buffer locks
// into the rangetree, calling sto_end(), and then setting the
// sto_score back to zero.
// requires: m_sto_txnid is valid
void sto_end_early(void *prepared_lkr);
void sto_end_early_no_accounting(void *prepared_lkr);
// params: prepared_lkr is a void * to a prepared locked keyrange. we can't use
// the real type because the compiler won't allow us to forward declare
// concurrent_tree::locked_keyrange without including concurrent_tree.h,
// which we cannot do here because it is a template implementation.
// requires: the prepared locked keyrange is for the locktree's rangetree
// requires: m_sto_txnid is valid
// effect: migrates each lock in the single txnid buffer into the locktree's
// rangetree, notifying the memory tracker as necessary.
void sto_migrate_buffer_ranges_to_tree(void *prepared_lkr);
// effect: If m_sto_txnid is valid, then release the txnid's locks
// by ending the optimization.
// requires: If m_sto_txnid is valid, it is equal to the given txnid
// returns: True if locks were released for this txnid
bool sto_try_release(TXNID txnid);
// params: prepared_lkr is a void * to a prepared locked keyrange. see above.
// requires: the prepared locked keyrange is for the locktree's rangetree
// effect: If m_sto_txnid is valid and equal to the given txnid, then
// append a range onto the buffer. Otherwise, if m_sto_txnid is valid
// but not equal to this txnid, then migrate the buffer's locks
// into the rangetree and end the optimization, setting the score
// back to zero.
// returns: true if the lock was acquired for this txnid
bool sto_try_acquire(void *prepared_lkr, TXNID txnid,
const DBT *left_key, const DBT *right_key);
// Effect:
// Provides a hook for a helgrind suppression.
// Returns:
// true if m_sto_txnid is not TXNID_NONE
bool sto_txnid_is_valid_unsafe(void) const;
// Effect:
// Provides a hook for a helgrind suppression.
// Returns:
// m_sto_score
int sto_get_score_unsafe(void )const;
void remove_overlapping_locks_for_txnid(TXNID txnid,
const DBT *left_key, const DBT *right_key);
int acquire_lock_consolidated(void *prepared_lkr, TXNID txnid,
const DBT *left_key, const DBT *right_key,
txnid_set *conflicts);
int acquire_lock(bool is_write_request, TXNID txnid,
const DBT *left_key, const DBT *right_key,
txnid_set *conflicts);
int try_acquire_lock(bool is_write_request, TXNID txnid,
const DBT *left_key, const DBT *right_key,
txnid_set *conflicts, bool big_txn);
friend class locktree_unit_test;
friend class manager_unit_test;
friend class lock_request_unit_test;
// engine status reaches into the locktree to read some stats
friend void locktree_manager::get_status(LTM_STATUS status);
};
} /* namespace toku */
...@@ -100,10 +100,9 @@ PATENT RIGHTS GRANT: ...@@ -100,10 +100,9 @@ PATENT RIGHTS GRANT:
namespace toku { namespace toku {
void locktree::manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb escalate_cb, void *escalate_extra) { void locktree_manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb escalate_cb, void *escalate_extra) {
m_max_lock_memory = DEFAULT_MAX_LOCK_MEMORY; m_max_lock_memory = DEFAULT_MAX_LOCK_MEMORY;
m_current_lock_memory = 0; m_current_lock_memory = 0;
m_mem_tracker.set_manager(this);
m_locktree_map.create(); m_locktree_map.create();
m_lt_create_callback = create_cb; m_lt_create_callback = create_cb;
...@@ -120,7 +119,7 @@ void locktree::manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, ...@@ -120,7 +119,7 @@ void locktree::manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb,
escalator_init(); escalator_init();
} }
void locktree::manager::destroy(void) { void locktree_manager::destroy(void) {
escalator_destroy(); escalator_destroy();
invariant(m_current_lock_memory == 0); invariant(m_current_lock_memory == 0);
invariant(m_locktree_map.size() == 0); invariant(m_locktree_map.size() == 0);
...@@ -128,19 +127,19 @@ void locktree::manager::destroy(void) { ...@@ -128,19 +127,19 @@ void locktree::manager::destroy(void) {
toku_mutex_destroy(&m_mutex); toku_mutex_destroy(&m_mutex);
} }
void locktree::manager::mutex_lock(void) { void locktree_manager::mutex_lock(void) {
toku_mutex_lock(&m_mutex); toku_mutex_lock(&m_mutex);
} }
void locktree::manager::mutex_unlock(void) { void locktree_manager::mutex_unlock(void) {
toku_mutex_unlock(&m_mutex); toku_mutex_unlock(&m_mutex);
} }
size_t locktree::manager::get_max_lock_memory(void) { size_t locktree_manager::get_max_lock_memory(void) {
return m_max_lock_memory; return m_max_lock_memory;
} }
int locktree::manager::set_max_lock_memory(size_t max_lock_memory) { int locktree_manager::set_max_lock_memory(size_t max_lock_memory) {
int r = 0; int r = 0;
mutex_lock(); mutex_lock();
if (max_lock_memory < m_current_lock_memory) { if (max_lock_memory < m_current_lock_memory) {
...@@ -152,39 +151,39 @@ int locktree::manager::set_max_lock_memory(size_t max_lock_memory) { ...@@ -152,39 +151,39 @@ int locktree::manager::set_max_lock_memory(size_t max_lock_memory) {
return r; return r;
} }
int locktree::manager::find_by_dict_id(locktree *const &lt, const DICTIONARY_ID &dict_id) { int locktree_manager::find_by_dict_id(locktree *const &lt, const DICTIONARY_ID &dict_id) {
if (lt->m_dict_id.dictid < dict_id.dictid) { if (lt->get_dict_id().dictid < dict_id.dictid) {
return -1; return -1;
} else if (lt->m_dict_id.dictid == dict_id.dictid) { } else if (lt->get_dict_id().dictid == dict_id.dictid) {
return 0; return 0;
} else { } else {
return 1; return 1;
} }
} }
locktree *locktree::manager::locktree_map_find(const DICTIONARY_ID &dict_id) { locktree *locktree_manager::locktree_map_find(const DICTIONARY_ID &dict_id) {
locktree *lt; locktree *lt;
int r = m_locktree_map.find_zero<DICTIONARY_ID, find_by_dict_id>(dict_id, &lt, nullptr); int r = m_locktree_map.find_zero<DICTIONARY_ID, find_by_dict_id>(dict_id, &lt, nullptr);
return r == 0 ? lt : nullptr; return r == 0 ? lt : nullptr;
} }
void locktree::manager::locktree_map_put(locktree *lt) { void locktree_manager::locktree_map_put(locktree *lt) {
int r = m_locktree_map.insert<DICTIONARY_ID, find_by_dict_id>(lt, lt->m_dict_id, nullptr); int r = m_locktree_map.insert<DICTIONARY_ID, find_by_dict_id>(lt, lt->get_dict_id(), nullptr);
invariant_zero(r); invariant_zero(r);
} }
void locktree::manager::locktree_map_remove(locktree *lt) { void locktree_manager::locktree_map_remove(locktree *lt) {
uint32_t idx; uint32_t idx;
locktree *found_lt; locktree *found_lt;
int r = m_locktree_map.find_zero<DICTIONARY_ID, find_by_dict_id>( int r = m_locktree_map.find_zero<DICTIONARY_ID, find_by_dict_id>(
lt->m_dict_id, &found_lt, &idx); lt->get_dict_id(), &found_lt, &idx);
invariant_zero(r); invariant_zero(r);
invariant(found_lt == lt); invariant(found_lt == lt);
r = m_locktree_map.delete_at(idx); r = m_locktree_map.delete_at(idx);
invariant_zero(r); invariant_zero(r);
} }
locktree *locktree::manager::get_lt(DICTIONARY_ID dict_id, DESCRIPTOR desc, locktree *locktree_manager::get_lt(DICTIONARY_ID dict_id, DESCRIPTOR desc,
ft_compare_func cmp, void *on_create_extra) { ft_compare_func cmp, void *on_create_extra) {
// hold the mutex around searching and maybe // hold the mutex around searching and maybe
...@@ -194,15 +193,14 @@ locktree *locktree::manager::get_lt(DICTIONARY_ID dict_id, DESCRIPTOR desc, ...@@ -194,15 +193,14 @@ locktree *locktree::manager::get_lt(DICTIONARY_ID dict_id, DESCRIPTOR desc,
locktree *lt = locktree_map_find(dict_id); locktree *lt = locktree_map_find(dict_id);
if (lt == nullptr) { if (lt == nullptr) {
XCALLOC(lt); XCALLOC(lt);
lt->create(&m_mem_tracker, dict_id, desc, cmp); lt->create(this, dict_id, desc, cmp);
invariant(lt->m_reference_count == 1);
// new locktree created - call the on_create callback // new locktree created - call the on_create callback
// and put it in the locktree map // and put it in the locktree map
if (m_lt_create_callback) { if (m_lt_create_callback) {
int r = m_lt_create_callback(lt, on_create_extra); int r = m_lt_create_callback(lt, on_create_extra);
if (r != 0) { if (r != 0) {
(void) toku_sync_sub_and_fetch(&lt->m_reference_count, 1); lt->release_reference();
lt->destroy(); lt->destroy();
toku_free(lt); toku_free(lt);
lt = nullptr; lt = nullptr;
...@@ -220,7 +218,7 @@ locktree *locktree::manager::get_lt(DICTIONARY_ID dict_id, DESCRIPTOR desc, ...@@ -220,7 +218,7 @@ locktree *locktree::manager::get_lt(DICTIONARY_ID dict_id, DESCRIPTOR desc,
return lt; return lt;
} }
void locktree::manager::reference_lt(locktree *lt) { void locktree_manager::reference_lt(locktree *lt) {
// increment using a sync fetch and add. // increment using a sync fetch and add.
// the caller guarantees that the lt won't be // the caller guarantees that the lt won't be
// destroyed while we increment the count here. // destroyed while we increment the count here.
...@@ -231,20 +229,12 @@ void locktree::manager::reference_lt(locktree *lt) { ...@@ -231,20 +229,12 @@ void locktree::manager::reference_lt(locktree *lt) {
// if the manager's mutex is held, it is ok for the // if the manager's mutex is held, it is ok for the
// reference count to transition from 0 to 1 (no race), // reference count to transition from 0 to 1 (no race),
// since we're serialized with other opens and closes. // since we're serialized with other opens and closes.
toku_sync_fetch_and_add(&lt->m_reference_count, 1); lt->add_reference();
} }
static void add_lt_counters(locktree::lt_counters *x, locktree::lt_counters *y) { void locktree_manager::release_lt(locktree *lt) {
x->wait_count += y->wait_count;
x->wait_time += y->wait_time;
x->long_wait_count += y->long_wait_count;
x->long_wait_time += y->long_wait_time;
x->timeout_count += y->timeout_count;
}
void locktree::manager::release_lt(locktree *lt) {
bool do_destroy = false; bool do_destroy = false;
DICTIONARY_ID dict_id = lt->m_dict_id; DICTIONARY_ID dict_id = lt->get_dict_id();
// Release a reference on the locktree. If the count transitions to zero, // Release a reference on the locktree. If the count transitions to zero,
// then we *may* need to do the cleanup. // then we *may* need to do the cleanup.
...@@ -274,7 +264,7 @@ void locktree::manager::release_lt(locktree *lt) { ...@@ -274,7 +264,7 @@ void locktree::manager::release_lt(locktree *lt) {
// This way, if many threads transition the same locktree's reference count // This way, if many threads transition the same locktree's reference count
// from 1 to zero and wait behind the manager's mutex, only one of them will // from 1 to zero and wait behind the manager's mutex, only one of them will
// do the actual destroy and the others will happily do nothing. // do the actual destroy and the others will happily do nothing.
uint32_t refs = toku_sync_sub_and_fetch(&lt->m_reference_count, 1); uint32_t refs = lt->release_reference();
if (refs == 0) { if (refs == 0) {
mutex_lock(); mutex_lock();
locktree *find_lt = locktree_map_find(dict_id); locktree *find_lt = locktree_map_find(dict_id);
...@@ -284,12 +274,12 @@ void locktree::manager::release_lt(locktree *lt) { ...@@ -284,12 +274,12 @@ void locktree::manager::release_lt(locktree *lt) {
// If the reference count is zero, it's our responsibility to remove // If the reference count is zero, it's our responsibility to remove
// it and do the destroy. Otherwise, someone still wants it. // it and do the destroy. Otherwise, someone still wants it.
invariant(find_lt == lt); invariant(find_lt == lt);
if (lt->m_reference_count == 0) { if (lt->get_reference_count() == 0) {
locktree_map_remove(lt); locktree_map_remove(lt);
do_destroy = true; do_destroy = true;
} }
} }
add_lt_counters(&m_lt_counters, &lt->m_lock_request_info.counters); m_lt_counters.add(lt->get_lock_request_info()->counters);
mutex_unlock(); mutex_unlock();
} }
...@@ -303,21 +293,22 @@ void locktree::manager::release_lt(locktree *lt) { ...@@ -303,21 +293,22 @@ void locktree::manager::release_lt(locktree *lt) {
} }
} }
// test-only version of lock escalation void locktree_manager::run_escalation(void) {
static void manager_run_escalation_fun(void *extra) { struct escalation_fn {
locktree::manager *thismanager = (locktree::manager *) extra; static void run(void *extra) {
thismanager->escalate_all_locktrees(); locktree_manager *mgr = (locktree_manager *) extra;
mgr->escalate_all_locktrees();
};
};
m_escalator.run(this, escalation_fn::run, this);
} }
void locktree::manager::run_escalation(void) { // test-only version of lock escalation
m_escalator.run(this, manager_run_escalation_fun, this); void locktree_manager::run_escalation_for_test(void) {
}
void locktree::manager::run_escalation_for_test(void) {
run_escalation(); run_escalation();
} }
void locktree::manager::escalate_all_locktrees(void) { void locktree_manager::escalate_all_locktrees(void) {
uint64_t t0 = toku_current_time_microsec(); uint64_t t0 = toku_current_time_microsec();
// get all locktrees // get all locktrees
...@@ -340,47 +331,25 @@ void locktree::manager::escalate_all_locktrees(void) { ...@@ -340,47 +331,25 @@ void locktree::manager::escalate_all_locktrees(void) {
add_escalator_wait_time(t1 - t0); add_escalator_wait_time(t1 - t0);
} }
void locktree::manager::memory_tracker::set_manager(manager *mgr) { void locktree_manager::note_mem_used(uint64_t mem_used) {
m_mgr = mgr; (void) toku_sync_fetch_and_add(&m_current_lock_memory, mem_used);
}
locktree::manager *locktree::manager::memory_tracker::get_manager(void) {
return m_mgr;
}
int locktree::manager::memory_tracker::check_current_lock_constraints(void) {
int r = 0;
// check if we're out of locks without the mutex first. then, grab the
// mutex and check again. if we're still out of locks, run escalation.
// return an error if we're still out of locks after escalation.
if (out_of_locks()) {
m_mgr->run_escalation();
if (out_of_locks()) {
r = TOKUDB_OUT_OF_LOCKS;
}
}
return r;
}
void locktree::manager::memory_tracker::note_mem_used(uint64_t mem_used) {
(void) toku_sync_fetch_and_add(&m_mgr->m_current_lock_memory, mem_used);
} }
void locktree::manager::memory_tracker::note_mem_released(uint64_t mem_released) { void locktree_manager::note_mem_released(uint64_t mem_released) {
uint64_t old_mem_used = toku_sync_fetch_and_sub(&m_mgr->m_current_lock_memory, mem_released); uint64_t old_mem_used = toku_sync_fetch_and_sub(&m_current_lock_memory, mem_released);
invariant(old_mem_used >= mem_released); invariant(old_mem_used >= mem_released);
} }
bool locktree::manager::memory_tracker::out_of_locks(void) const { bool locktree_manager::out_of_locks(void) const {
return m_mgr->m_current_lock_memory >= m_mgr->m_max_lock_memory; return m_current_lock_memory >= m_max_lock_memory;
} }
bool locktree::manager::memory_tracker::over_big_threshold(void) { bool locktree_manager::over_big_threshold(void) {
return m_mgr->m_current_lock_memory >= m_mgr->m_max_lock_memory / 2; return m_current_lock_memory >= m_max_lock_memory / 2;
} }
int locktree::manager::iterate_pending_lock_requests( int locktree_manager::iterate_pending_lock_requests(lock_request_iterate_callback callback,
lock_request_iterate_callback callback, void *extra) { void *extra) {
mutex_lock(); mutex_lock();
int r = 0; int r = 0;
size_t num_locktrees = m_locktree_map.size(); size_t num_locktrees = m_locktree_map.size();
...@@ -389,7 +358,7 @@ int locktree::manager::iterate_pending_lock_requests( ...@@ -389,7 +358,7 @@ int locktree::manager::iterate_pending_lock_requests(
r = m_locktree_map.fetch(i, &lt); r = m_locktree_map.fetch(i, &lt);
invariant_zero(r); invariant_zero(r);
struct lt_lock_request_info *info = &lt->m_lock_request_info; struct lt_lock_request_info *info = lt->get_lock_request_info();
toku_mutex_lock(&info->mutex); toku_mutex_lock(&info->mutex);
size_t num_requests = info->pending_lock_requests.size(); size_t num_requests = info->pending_lock_requests.size();
...@@ -397,7 +366,7 @@ int locktree::manager::iterate_pending_lock_requests( ...@@ -397,7 +366,7 @@ int locktree::manager::iterate_pending_lock_requests(
lock_request *req; lock_request *req;
r = info->pending_lock_requests.fetch(k, &req); r = info->pending_lock_requests.fetch(k, &req);
invariant_zero(r); invariant_zero(r);
r = callback(lt->m_dict_id, req->get_txnid(), r = callback(lt->get_dict_id(), req->get_txnid(),
req->get_left_key(), req->get_right_key(), req->get_left_key(), req->get_right_key(),
req->get_conflicting_txnid(), req->get_start_time(), extra); req->get_conflicting_txnid(), req->get_start_time(), extra);
} }
...@@ -408,21 +377,25 @@ int locktree::manager::iterate_pending_lock_requests( ...@@ -408,21 +377,25 @@ int locktree::manager::iterate_pending_lock_requests(
return r; return r;
} }
int locktree::manager::check_current_lock_constraints(bool big_txn) { int locktree_manager::check_current_lock_constraints(bool big_txn) {
int r = 0; int r = 0;
if (big_txn && m_mem_tracker.over_big_threshold()) { if (big_txn && over_big_threshold()) {
run_escalation(); run_escalation();
if (m_mem_tracker.over_big_threshold()) { if (over_big_threshold()) {
r = TOKUDB_OUT_OF_LOCKS; r = TOKUDB_OUT_OF_LOCKS;
} }
} }
if (r == 0) { if (r == 0 && out_of_locks()) {
r = m_mem_tracker.check_current_lock_constraints(); run_escalation();
if (out_of_locks()) {
// return an error if we're still out of locks after escalation.
r = TOKUDB_OUT_OF_LOCKS;
}
} }
return r; return r;
} }
void locktree::manager::escalator_init(void) { void locktree_manager::escalator_init(void) {
ZERO_STRUCT(m_escalation_mutex); ZERO_STRUCT(m_escalation_mutex);
toku_mutex_init(&m_escalation_mutex, nullptr); toku_mutex_init(&m_escalation_mutex, nullptr);
m_escalation_count = 0; m_escalation_count = 0;
...@@ -435,12 +408,12 @@ void locktree::manager::escalator_init(void) { ...@@ -435,12 +408,12 @@ void locktree::manager::escalator_init(void) {
m_escalator.create(); m_escalator.create();
} }
void locktree::manager::escalator_destroy(void) { void locktree_manager::escalator_destroy(void) {
m_escalator.destroy(); m_escalator.destroy();
toku_mutex_destroy(&m_escalation_mutex); toku_mutex_destroy(&m_escalation_mutex);
} }
void locktree::manager::add_escalator_wait_time(uint64_t t) { void locktree_manager::add_escalator_wait_time(uint64_t t) {
toku_mutex_lock(&m_escalation_mutex); toku_mutex_lock(&m_escalation_mutex);
m_wait_escalation_count += 1; m_wait_escalation_count += 1;
m_wait_escalation_time += t; m_wait_escalation_time += t;
...@@ -451,7 +424,7 @@ void locktree::manager::add_escalator_wait_time(uint64_t t) { ...@@ -451,7 +424,7 @@ void locktree::manager::add_escalator_wait_time(uint64_t t) {
toku_mutex_unlock(&m_escalation_mutex); toku_mutex_unlock(&m_escalation_mutex);
} }
void locktree::manager::escalate_locktrees(locktree **locktrees, int num_locktrees) { void locktree_manager::escalate_locktrees(locktree **locktrees, int num_locktrees) {
// there are too many row locks in the system and we need to tidy up. // there are too many row locks in the system and we need to tidy up.
// //
// a simple implementation of escalation does not attempt // a simple implementation of escalation does not attempt
...@@ -474,43 +447,24 @@ void locktree::manager::escalate_locktrees(locktree **locktrees, int num_locktre ...@@ -474,43 +447,24 @@ void locktree::manager::escalate_locktrees(locktree **locktrees, int num_locktre
} }
struct escalate_args { struct escalate_args {
locktree::manager *mgr; locktree_manager *mgr;
locktree **locktrees; locktree **locktrees;
int num_locktrees; int num_locktrees;
}; };
static void manager_escalate_locktrees(void *extra) { void locktree_manager::locktree_escalator::create(void) {
escalate_args *args = (escalate_args *) extra;
args->mgr->escalate_locktrees(args->locktrees, args->num_locktrees);
}
void locktree::manager::escalate_lock_trees_for_txn(TXNID txnid UU(), locktree *lt UU()) {
// get lock trees for txnid
const int num_locktrees = 1;
locktree *locktrees[1] = { lt };
reference_lt(lt);
// escalate these lock trees
locktree::escalator this_escalator;
this_escalator.create();
escalate_args args = { this, locktrees, num_locktrees };
this_escalator.run(this, manager_escalate_locktrees, &args);
this_escalator.destroy();
}
void locktree::escalator::create(void) {
ZERO_STRUCT(m_escalator_mutex); ZERO_STRUCT(m_escalator_mutex);
toku_mutex_init(&m_escalator_mutex, nullptr); toku_mutex_init(&m_escalator_mutex, nullptr);
toku_cond_init(&m_escalator_done, nullptr); toku_cond_init(&m_escalator_done, nullptr);
m_escalator_running = false; m_escalator_running = false;
} }
void locktree::escalator::destroy(void) { void locktree_manager::locktree_escalator::destroy(void) {
toku_cond_destroy(&m_escalator_done); toku_cond_destroy(&m_escalator_done);
toku_mutex_destroy(&m_escalator_mutex); toku_mutex_destroy(&m_escalator_mutex);
} }
void locktree::escalator::run(locktree::manager *mgr, void (*escalate_locktrees_fun)(void *extra), void *extra) { void locktree_manager::locktree_escalator::run(locktree_manager *mgr, void (*escalate_locktrees_fun)(void *extra), void *extra) {
uint64_t t0 = toku_current_time_microsec(); uint64_t t0 = toku_current_time_microsec();
toku_mutex_lock(&m_escalator_mutex); toku_mutex_lock(&m_escalator_mutex);
if (!m_escalator_running) { if (!m_escalator_running) {
...@@ -531,7 +485,7 @@ void locktree::escalator::run(locktree::manager *mgr, void (*escalate_locktrees_ ...@@ -531,7 +485,7 @@ void locktree::escalator::run(locktree::manager *mgr, void (*escalate_locktrees_
#define STATUS_INIT(k,c,t,l,inc) TOKUDB_STATUS_INIT(status, k, c, t, "locktree: " l, inc) #define STATUS_INIT(k,c,t,l,inc) TOKUDB_STATUS_INIT(status, k, c, t, "locktree: " l, inc)
void locktree::manager::status_init(void) { void locktree_manager::status_init(void) {
STATUS_INIT(LTM_SIZE_CURRENT, LOCKTREE_MEMORY_SIZE, UINT64, "memory size", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS); STATUS_INIT(LTM_SIZE_CURRENT, LOCKTREE_MEMORY_SIZE, UINT64, "memory size", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
STATUS_INIT(LTM_SIZE_LIMIT, LOCKTREE_MEMORY_SIZE_LIMIT, UINT64, "memory size limit", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS); STATUS_INIT(LTM_SIZE_LIMIT, LOCKTREE_MEMORY_SIZE_LIMIT, UINT64, "memory size limit", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
STATUS_INIT(LTM_ESCALATION_COUNT, LOCKTREE_ESCALATION_NUM, UINT64, "number of times lock escalation ran", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS); STATUS_INIT(LTM_ESCALATION_COUNT, LOCKTREE_ESCALATION_NUM, UINT64, "number of times lock escalation ran", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
...@@ -561,7 +515,7 @@ void locktree::manager::status_init(void) { ...@@ -561,7 +515,7 @@ void locktree::manager::status_init(void) {
#define STATUS_VALUE(x) status.status[x].value.num #define STATUS_VALUE(x) status.status[x].value.num
void locktree::manager::get_status(LTM_STATUS statp) { void locktree_manager::get_status(LTM_STATUS statp) {
if (!status.initialized) { if (!status.initialized) {
status_init(); status_init();
} }
...@@ -593,7 +547,7 @@ void locktree::manager::get_status(LTM_STATUS statp) { ...@@ -593,7 +547,7 @@ void locktree::manager::get_status(LTM_STATUS statp) {
toku_mutex_lock(&lt->m_lock_request_info.mutex); toku_mutex_lock(&lt->m_lock_request_info.mutex);
lock_requests_pending += lt->m_lock_request_info.pending_lock_requests.size(); lock_requests_pending += lt->m_lock_request_info.pending_lock_requests.size();
add_lt_counters(&lt_counters, &lt->m_lock_request_info.counters); lt_counters.add(lt->get_lock_request_info()->counters);
toku_mutex_unlock(&lt->m_lock_request_info.mutex); toku_mutex_unlock(&lt->m_lock_request_info.mutex);
sto_num_eligible += lt->sto_txnid_is_valid_unsafe() ? 1 : 0; sto_num_eligible += lt->sto_txnid_is_valid_unsafe() ? 1 : 0;
......
...@@ -117,13 +117,10 @@ static int my_killed_callback(void) { ...@@ -117,13 +117,10 @@ static int my_killed_callback(void) {
// make sure deadlocks are detected when a lock request starts // make sure deadlocks are detected when a lock request starts
void lock_request_unit_test::test_wait_time_callback(void) { void lock_request_unit_test::test_wait_time_callback(void) {
int r; int r;
locktree::manager mgr; locktree lt;
locktree *lt;
mgr.create(nullptr, nullptr, nullptr, nullptr);
DICTIONARY_ID dict_id = { 1 }; DICTIONARY_ID dict_id = { 1 };
lt = mgr.get_lt(dict_id, nullptr, compare_dbts, nullptr); lt.create(nullptr, dict_id, nullptr, compare_dbts);
TXNID txnid_a = 1001; TXNID txnid_a = 1001;
lock_request request_a; lock_request request_a;
...@@ -136,12 +133,12 @@ void lock_request_unit_test::test_wait_time_callback(void) { ...@@ -136,12 +133,12 @@ void lock_request_unit_test::test_wait_time_callback(void) {
const DBT *one = get_dbt(1); const DBT *one = get_dbt(1);
// a locks 'one' // a locks 'one'
request_a.set(lt, txnid_a, one, one, lock_request::type::WRITE, false); request_a.set(&lt, txnid_a, one, one, lock_request::type::WRITE, false);
r = request_a.start(); r = request_a.start();
assert_zero(r); assert_zero(r);
// b tries to lock 'one' // b tries to lock 'one'
request_b.set(lt, txnid_b, one, one, lock_request::type::WRITE, false); request_b.set(&lt, txnid_b, one, one, lock_request::type::WRITE, false);
r = request_b.start(); r = request_b.start();
assert(r == DB_LOCK_NOTGRANTED); assert(r == DB_LOCK_NOTGRANTED);
...@@ -162,11 +159,9 @@ void lock_request_unit_test::test_wait_time_callback(void) { ...@@ -162,11 +159,9 @@ void lock_request_unit_test::test_wait_time_callback(void) {
request_b.destroy(); request_b.destroy();
release_lock_and_retry_requests(lt, txnid_a, one, one); release_lock_and_retry_requests(&lt, txnid_a, one, one);
request_a.destroy(); request_a.destroy();
lt.create(nullptr, dict_id, nullptr, compare_dbts);
mgr.release_lt(lt);
mgr.destroy();
} }
} /* namespace toku */ } /* namespace toku */
......
...@@ -114,13 +114,10 @@ static int my_killed_callback(void) { ...@@ -114,13 +114,10 @@ static int my_killed_callback(void) {
// make sure deadlocks are detected when a lock request starts // make sure deadlocks are detected when a lock request starts
void lock_request_unit_test::test_wait_time_callback(void) { void lock_request_unit_test::test_wait_time_callback(void) {
int r; int r;
locktree::manager mgr; locktree lt;
locktree *lt;
mgr.create(nullptr, nullptr, nullptr, nullptr);
DICTIONARY_ID dict_id = { 1 }; DICTIONARY_ID dict_id = { 1 };
lt = mgr.get_lt(dict_id, nullptr, compare_dbts, nullptr); lt.create(nullptr, dict_id, nullptr, compare_dbts);
TXNID txnid_a = 1001; TXNID txnid_a = 1001;
lock_request request_a; lock_request request_a;
...@@ -133,12 +130,12 @@ void lock_request_unit_test::test_wait_time_callback(void) { ...@@ -133,12 +130,12 @@ void lock_request_unit_test::test_wait_time_callback(void) {
const DBT *one = get_dbt(1); const DBT *one = get_dbt(1);
// a locks 'one' // a locks 'one'
request_a.set(lt, txnid_a, one, one, lock_request::type::WRITE, false); request_a.set(&lt, txnid_a, one, one, lock_request::type::WRITE, false);
r = request_a.start(); r = request_a.start();
assert_zero(r); assert_zero(r);
// b tries to lock 'one' // b tries to lock 'one'
request_b.set(lt, txnid_b, one, one, lock_request::type::WRITE, false); request_b.set(&lt, txnid_b, one, one, lock_request::type::WRITE, false);
r = request_b.start(); r = request_b.start();
assert(r == DB_LOCK_NOTGRANTED); assert(r == DB_LOCK_NOTGRANTED);
...@@ -158,11 +155,8 @@ void lock_request_unit_test::test_wait_time_callback(void) { ...@@ -158,11 +155,8 @@ void lock_request_unit_test::test_wait_time_callback(void) {
request_b.destroy(); request_b.destroy();
release_lock_and_retry_requests(lt, txnid_a, one, one); release_lock_and_retry_requests(&lt, txnid_a, one, one);
request_a.destroy(); request_a.destroy();
mgr.release_lt(lt);
mgr.destroy();
} }
} /* namespace toku */ } /* namespace toku */
......
...@@ -96,14 +96,13 @@ namespace toku { ...@@ -96,14 +96,13 @@ namespace toku {
// make sure deadlocks are detected when a lock request starts // make sure deadlocks are detected when a lock request starts
void lock_request_unit_test::test_start_deadlock(void) { void lock_request_unit_test::test_start_deadlock(void) {
int r; int r;
locktree::manager mgr; locktree lt;
locktree *lt;
// something short // something short
const uint64_t lock_wait_time = 10; const uint64_t lock_wait_time = 10;
mgr.create(nullptr, nullptr, nullptr, nullptr);
DICTIONARY_ID dict_id = { 1 }; DICTIONARY_ID dict_id = { 1 };
lt = mgr.get_lt(dict_id, nullptr, compare_dbts, nullptr); lt.create(nullptr, dict_id, nullptr, compare_dbts);
TXNID txnid_a = 1001; TXNID txnid_a = 1001;
TXNID txnid_b = 2001; TXNID txnid_b = 2001;
...@@ -119,30 +118,30 @@ void lock_request_unit_test::test_start_deadlock(void) { ...@@ -119,30 +118,30 @@ void lock_request_unit_test::test_start_deadlock(void) {
const DBT *two = get_dbt(2); const DBT *two = get_dbt(2);
// start and succeed 1,1 for A and 2,2 for B. // start and succeed 1,1 for A and 2,2 for B.
request_a.set(lt, txnid_a, one, one, lock_request::type::WRITE, false); request_a.set(&lt, txnid_a, one, one, lock_request::type::WRITE, false);
r = request_a.start(); r = request_a.start();
invariant_zero(r); invariant_zero(r);
request_b.set(lt, txnid_b, two, two, lock_request::type::WRITE, false); request_b.set(&lt, txnid_b, two, two, lock_request::type::WRITE, false);
r = request_b.start(); r = request_b.start();
invariant_zero(r); invariant_zero(r);
// txnid A should not be granted a lock on 2,2, so it goes pending. // txnid A should not be granted a lock on 2,2, so it goes pending.
request_a.set(lt, txnid_a, two, two, lock_request::type::WRITE, false); request_a.set(&lt, txnid_a, two, two, lock_request::type::WRITE, false);
r = request_a.start(); r = request_a.start();
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
// if txnid B wants a lock on 1,1 it should deadlock with A // if txnid B wants a lock on 1,1 it should deadlock with A
request_b.set(lt, txnid_b, one, one, lock_request::type::WRITE, false); request_b.set(&lt, txnid_b, one, one, lock_request::type::WRITE, false);
r = request_b.start(); r = request_b.start();
invariant(r == DB_LOCK_DEADLOCK); invariant(r == DB_LOCK_DEADLOCK);
// txnid C should not deadlock on either of these - it should just time out. // txnid C should not deadlock on either of these - it should just time out.
request_c.set(lt, txnid_c, one, one, lock_request::type::WRITE, false); request_c.set(&lt, txnid_c, one, one, lock_request::type::WRITE, false);
r = request_c.start(); r = request_c.start();
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
r = request_c.wait(lock_wait_time); r = request_c.wait(lock_wait_time);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
request_c.set(lt, txnid_c, two, two, lock_request::type::WRITE, false); request_c.set(&lt, txnid_c, two, two, lock_request::type::WRITE, false);
r = request_c.start(); r = request_c.start();
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
r = request_c.wait(lock_wait_time); r = request_c.wait(lock_wait_time);
...@@ -150,17 +149,15 @@ void lock_request_unit_test::test_start_deadlock(void) { ...@@ -150,17 +149,15 @@ void lock_request_unit_test::test_start_deadlock(void) {
// release locks for A and B, then wait on A's request which should succeed // release locks for A and B, then wait on A's request which should succeed
// since B just unlocked and should have completed A's pending request. // since B just unlocked and should have completed A's pending request.
release_lock_and_retry_requests(lt, txnid_a, one, one); release_lock_and_retry_requests(&lt, txnid_a, one, one);
release_lock_and_retry_requests(lt, txnid_b, two, two); release_lock_and_retry_requests(&lt, txnid_b, two, two);
r = request_a.wait(lock_wait_time); r = request_a.wait(lock_wait_time);
invariant_zero(r); invariant_zero(r);
release_lock_and_retry_requests(lt, txnid_a, two, two); release_lock_and_retry_requests(&lt, txnid_a, two, two);
request_a.destroy(); request_a.destroy();
request_b.destroy(); request_b.destroy();
request_c.destroy(); request_c.destroy();
mgr.release_lt(lt);
mgr.destroy();
} }
} /* namespace toku */ } /* namespace toku */
......
...@@ -97,13 +97,11 @@ namespace toku { ...@@ -97,13 +97,11 @@ namespace toku {
// stored in the lock request set as pending. // stored in the lock request set as pending.
void lock_request_unit_test::test_start_pending(void) { void lock_request_unit_test::test_start_pending(void) {
int r; int r;
locktree::manager mgr; locktree lt;
locktree *lt;
lock_request request; lock_request request;
mgr.create(nullptr, nullptr, nullptr, nullptr);
DICTIONARY_ID dict_id = { 1 }; DICTIONARY_ID dict_id = { 1 };
lt = mgr.get_lt(dict_id, nullptr, compare_dbts, nullptr); lt.create(nullptr, dict_id, nullptr, compare_dbts);
TXNID txnid_a = 1001; TXNID txnid_a = 1001;
TXNID txnid_b = 2001; TXNID txnid_b = 2001;
...@@ -113,15 +111,15 @@ void lock_request_unit_test::test_start_pending(void) { ...@@ -113,15 +111,15 @@ void lock_request_unit_test::test_start_pending(void) {
const DBT *two = get_dbt(2); const DBT *two = get_dbt(2);
// take a range lock using txnid b // take a range lock using txnid b
r = lt->acquire_write_lock(txnid_b, zero, two, nullptr, false); r = lt.acquire_write_lock(txnid_b, zero, two, nullptr, false);
invariant_zero(r); invariant_zero(r);
locktree::lt_lock_request_info *info = lt->get_lock_request_info(); lt_lock_request_info *info = lt.get_lock_request_info();
// start a lock request for 1,1 // start a lock request for 1,1
// it should fail. the request should be stored and in the pending state. // it should fail. the request should be stored and in the pending state.
request.create(); request.create();
request.set(lt, txnid_a, one, one, lock_request::type::WRITE, false); request.set(&lt, txnid_a, one, one, lock_request::type::WRITE, false);
r = request.start(); r = request.start();
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
invariant(info->pending_lock_requests.size() == 1); invariant(info->pending_lock_requests.size() == 1);
...@@ -134,20 +132,18 @@ void lock_request_unit_test::test_start_pending(void) { ...@@ -134,20 +132,18 @@ void lock_request_unit_test::test_start_pending(void) {
invariant(compare_dbts(nullptr, &request.m_right_key_copy, one) == 0); invariant(compare_dbts(nullptr, &request.m_right_key_copy, one) == 0);
// release the range lock for txnid b // release the range lock for txnid b
locktree_unit_test::locktree_test_release_lock(lt, txnid_b, zero, two); locktree_unit_test::locktree_test_release_lock(&lt, txnid_b, zero, two);
// now retry the lock requests. // now retry the lock requests.
// it should transition the request to successfully complete. // it should transition the request to successfully complete.
lock_request::retry_all_lock_requests(lt); lock_request::retry_all_lock_requests(&lt);
invariant(info->pending_lock_requests.size() == 0); invariant(info->pending_lock_requests.size() == 0);
invariant(request.m_state == lock_request::state::COMPLETE); invariant(request.m_state == lock_request::state::COMPLETE);
invariant(request.m_complete_r == 0); invariant(request.m_complete_r == 0);
locktree_unit_test::locktree_test_release_lock(lt, txnid_a, one, one); locktree_unit_test::locktree_test_release_lock(&lt, txnid_a, one, one);
request.destroy(); request.destroy();
mgr.release_lt(lt);
mgr.destroy();
} }
} /* namespace toku */ } /* namespace toku */
......
...@@ -98,12 +98,10 @@ static const uint64_t my_lock_wait_time = 10 * 1000; // 10 sec ...@@ -98,12 +98,10 @@ static const uint64_t my_lock_wait_time = 10 * 1000; // 10 sec
// make sure deadlocks are detected when a lock request starts // make sure deadlocks are detected when a lock request starts
void lock_request_unit_test::test_wait_time_callback(void) { void lock_request_unit_test::test_wait_time_callback(void) {
int r; int r;
locktree::manager mgr; locktree lt;
locktree *lt;
mgr.create(nullptr, nullptr, nullptr, nullptr);
DICTIONARY_ID dict_id = { 1 }; DICTIONARY_ID dict_id = { 1 };
lt = mgr.get_lt(dict_id, nullptr, compare_dbts, nullptr); lt.create(nullptr, dict_id, nullptr, compare_dbts);
TXNID txnid_a = 1001; TXNID txnid_a = 1001;
lock_request request_a; lock_request request_a;
...@@ -117,12 +115,12 @@ void lock_request_unit_test::test_wait_time_callback(void) { ...@@ -117,12 +115,12 @@ void lock_request_unit_test::test_wait_time_callback(void) {
const DBT *two = get_dbt(2); const DBT *two = get_dbt(2);
// a locks 'one' // a locks 'one'
request_a.set(lt, txnid_a, one, one, lock_request::type::WRITE, false); request_a.set(&lt, txnid_a, one, one, lock_request::type::WRITE, false);
r = request_a.start(); r = request_a.start();
assert_zero(r); assert_zero(r);
// b tries to lock 'one' // b tries to lock 'one'
request_b.set(lt, txnid_b, one, two, lock_request::type::WRITE, false); request_b.set(&lt, txnid_b, one, two, lock_request::type::WRITE, false);
r = request_b.start(); r = request_b.start();
assert(r == DB_LOCK_NOTGRANTED); assert(r == DB_LOCK_NOTGRANTED);
uint64_t t_start = toku_current_time_microsec(); uint64_t t_start = toku_current_time_microsec();
...@@ -134,11 +132,8 @@ void lock_request_unit_test::test_wait_time_callback(void) { ...@@ -134,11 +132,8 @@ void lock_request_unit_test::test_wait_time_callback(void) {
assert(t_delta >= my_lock_wait_time); assert(t_delta >= my_lock_wait_time);
request_b.destroy(); request_b.destroy();
release_lock_and_retry_requests(lt, txnid_a, one, one); release_lock_and_retry_requests(&lt, txnid_a, one, one);
request_a.destroy(); request_a.destroy();
mgr.release_lt(lt);
mgr.destroy();
} }
} /* namespace toku */ } /* namespace toku */
......
...@@ -105,11 +105,10 @@ namespace toku { ...@@ -105,11 +105,10 @@ namespace toku {
// test write lock conflicts when read or write locks exist // test write lock conflicts when read or write locks exist
// test read lock conflicts when write locks exist // test read lock conflicts when write locks exist
void locktree_unit_test::test_conflicts(void) { void locktree_unit_test::test_conflicts(void) {
locktree::manager mgr; locktree lt;
mgr.create(nullptr, nullptr, nullptr, nullptr);
DESCRIPTOR desc = nullptr;
DICTIONARY_ID dict_id = { 1 }; DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, desc, compare_dbts, nullptr); lt.create(nullptr, dict_id, nullptr, compare_dbts);
int r; int r;
TXNID txnid_a = 1001; TXNID txnid_a = 1001;
...@@ -125,8 +124,8 @@ void locktree_unit_test::test_conflicts(void) { ...@@ -125,8 +124,8 @@ void locktree_unit_test::test_conflicts(void) {
// test_run == 0 means test with read lock // test_run == 0 means test with read lock
// test_run == 1 means test with write lock // test_run == 1 means test with write lock
#define ACQUIRE_LOCK(txn, left, right, conflicts) \ #define ACQUIRE_LOCK(txn, left, right, conflicts) \
test_run == 0 ? lt->acquire_read_lock(txn, left, right, conflicts, false) \ test_run == 0 ? lt.acquire_read_lock(txn, left, right, conflicts, false) \
: lt->acquire_write_lock(txn, left, right, conflicts, false) : lt.acquire_write_lock(txn, left, right, conflicts, false)
// acquire some locks for txnid_a // acquire some locks for txnid_a
r = ACQUIRE_LOCK(txnid_a, one, one, nullptr); r = ACQUIRE_LOCK(txnid_a, one, one, nullptr);
...@@ -142,8 +141,8 @@ void locktree_unit_test::test_conflicts(void) { ...@@ -142,8 +141,8 @@ void locktree_unit_test::test_conflicts(void) {
// if test_run == 0, then read locks exist. only test write locks. // if test_run == 0, then read locks exist. only test write locks.
#define ACQUIRE_LOCK(txn, left, right, conflicts) \ #define ACQUIRE_LOCK(txn, left, right, conflicts) \
sub_test_run == 0 && test_run == 1 ? \ sub_test_run == 0 && test_run == 1 ? \
lt->acquire_read_lock(txn, left, right, conflicts, false) \ lt.acquire_read_lock(txn, left, right, conflicts, false) \
: lt->acquire_write_lock(txn, left, right, conflicts, false) : lt.acquire_write_lock(txn, left, right, conflicts, false)
// try to get point write locks for txnid_b, should fail // try to get point write locks for txnid_b, should fail
r = ACQUIRE_LOCK(txnid_b, one, one, nullptr); r = ACQUIRE_LOCK(txnid_b, one, one, nullptr);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
...@@ -162,13 +161,10 @@ void locktree_unit_test::test_conflicts(void) { ...@@ -162,13 +161,10 @@ void locktree_unit_test::test_conflicts(void) {
#undef ACQUIRE_LOCK #undef ACQUIRE_LOCK
} }
lt->remove_overlapping_locks_for_txnid(txnid_a, one, one); lt.remove_overlapping_locks_for_txnid(txnid_a, one, one);
lt->remove_overlapping_locks_for_txnid(txnid_a, three, four); lt.remove_overlapping_locks_for_txnid(txnid_a, three, four);
invariant(no_row_locks(lt)); invariant(no_row_locks(&lt));
} }
mgr.release_lt(lt);
mgr.destroy();
} }
} /* namespace toku */ } /* namespace toku */
......
...@@ -95,27 +95,23 @@ namespace toku { ...@@ -95,27 +95,23 @@ namespace toku {
// test simple create and destroy of the locktree // test simple create and destroy of the locktree
void locktree_unit_test::test_create_destroy(void) { void locktree_unit_test::test_create_destroy(void) {
locktree::manager mgr; locktree lt;
mgr.create(nullptr, nullptr, nullptr, nullptr);
DESCRIPTOR desc = nullptr;
DICTIONARY_ID dict_id = { 1 }; DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, desc, compare_dbts, nullptr);
locktree::lt_lock_request_info *info = lt->get_lock_request_info(); lt.create(nullptr, dict_id, nullptr, compare_dbts);
lt_lock_request_info *info = lt.get_lock_request_info();
invariant_notnull(info); invariant_notnull(info);
toku_mutex_lock(&info->mutex); toku_mutex_lock(&info->mutex);
toku_mutex_unlock(&info->mutex); toku_mutex_unlock(&info->mutex);
invariant(lt->m_dict_id.dictid == dict_id.dictid); invariant(lt.m_dict_id.dictid == dict_id.dictid);
invariant(lt->m_reference_count == 1); invariant(lt.m_reference_count == 1);
invariant(lt->m_rangetree != nullptr); invariant(lt.m_rangetree != nullptr);
invariant(lt->m_userdata == nullptr); invariant(lt.m_userdata == nullptr);
invariant(info->pending_lock_requests.size() == 0); invariant(info->pending_lock_requests.size() == 0);
invariant(lt->m_sto_end_early_count == 0); invariant(lt.m_sto_end_early_count == 0);
invariant(lt->m_sto_end_early_time == 0); invariant(lt.m_sto_end_early_time == 0);
mgr.release_lt(lt);
mgr.destroy();
} }
} /* namespace toku */ } /* namespace toku */
......
...@@ -119,21 +119,7 @@ static int locktree_write_lock(locktree *lt, TXNID txn_id, int64_t left_k, int64 ...@@ -119,21 +119,7 @@ static int locktree_write_lock(locktree *lt, TXNID txn_id, int64_t left_k, int64
return lt->acquire_write_lock(txn_id, &left, &right, nullptr, big_txn); return lt->acquire_write_lock(txn_id, &left, &right, nullptr, big_txn);
} }
#if 0 static void run_big_txn(locktree_manager *mgr UU(), locktree **lt, int n_lt, TXNID txn_id) {
static locktree **big_txn_lt;
static int n_big_txn_lt;
static int get_locktrees_touched_by_txn(TXNID txn_id UU(), void *txn_extra UU(), locktree ***ret_locktrees, int *ret_num_locktrees) {
locktree **locktrees = (locktree **) toku_malloc(n_big_txn_lt * sizeof (locktree *));
for (int i = 0; i < n_big_txn_lt; i++)
locktrees[i] = big_txn_lt[i];
*ret_locktrees = locktrees;
*ret_num_locktrees = n_big_txn_lt;
return 0;
}
#endif
static void run_big_txn(locktree::manager *mgr UU(), locktree **lt, int n_lt, TXNID txn_id) {
int64_t last_i = -1; int64_t last_i = -1;
for (int64_t i = 0; !killed; i++) { for (int64_t i = 0; !killed; i++) {
for (int j = 0; j < n_lt; j++) { for (int j = 0; j < n_lt; j++) {
...@@ -157,7 +143,7 @@ static void run_big_txn(locktree::manager *mgr UU(), locktree **lt, int n_lt, TX ...@@ -157,7 +143,7 @@ static void run_big_txn(locktree::manager *mgr UU(), locktree **lt, int n_lt, TX
} }
struct big_arg { struct big_arg {
locktree::manager *mgr; locktree_manager *mgr;
locktree **lt; locktree **lt;
int n_lt; int n_lt;
TXNID txn_id; TXNID txn_id;
...@@ -171,7 +157,7 @@ static void *big_f(void *_arg) { ...@@ -171,7 +157,7 @@ static void *big_f(void *_arg) {
return arg; return arg;
} }
static void run_small_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_id, int64_t k) { static void run_small_txn(locktree_manager *mgr UU(), locktree *lt, TXNID txn_id, int64_t k) {
int64_t i; int64_t i;
for (i = 0; !killed; i++) { for (i = 0; !killed; i++) {
uint64_t t_start = toku_current_time_microsec(); uint64_t t_start = toku_current_time_microsec();
...@@ -190,7 +176,7 @@ static void run_small_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_i ...@@ -190,7 +176,7 @@ static void run_small_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_i
} }
struct small_arg { struct small_arg {
locktree::manager *mgr; locktree_manager *mgr;
locktree *lt; locktree *lt;
TXNID txn_id; TXNID txn_id;
int64_t k; int64_t k;
...@@ -209,7 +195,7 @@ static void e_callback(TXNID txnid, const locktree *lt, const range_buffer &buff ...@@ -209,7 +195,7 @@ static void e_callback(TXNID txnid, const locktree *lt, const range_buffer &buff
printf("%u %s %" PRIu64 " %p %d %p\n", toku_os_gettid(), __FUNCTION__, txnid, lt, buffer.get_num_ranges(), extra); printf("%u %s %" PRIu64 " %p %d %p\n", toku_os_gettid(), __FUNCTION__, txnid, lt, buffer.get_num_ranges(), extra);
} }
static uint64_t get_escalation_count(locktree::manager &mgr) { static uint64_t get_escalation_count(locktree_manager &mgr) {
LTM_STATUS_S ltm_status; LTM_STATUS_S ltm_status;
mgr.get_status(&ltm_status); mgr.get_status(&ltm_status);
...@@ -251,7 +237,7 @@ int main(int argc, const char *argv[]) { ...@@ -251,7 +237,7 @@ int main(int argc, const char *argv[]) {
int r; int r;
// create a manager // create a manager
locktree::manager mgr; locktree_manager mgr;
mgr.create(nullptr, nullptr, e_callback, nullptr); mgr.create(nullptr, nullptr, e_callback, nullptr);
mgr.set_max_lock_memory(max_lock_memory); mgr.set_max_lock_memory(max_lock_memory);
...@@ -264,11 +250,6 @@ int main(int argc, const char *argv[]) { ...@@ -264,11 +250,6 @@ int main(int argc, const char *argv[]) {
big_lt[i] = mgr.get_lt(dict_id, nullptr, compare_dbts, nullptr); big_lt[i] = mgr.get_lt(dict_id, nullptr, compare_dbts, nullptr);
} }
#if 0
big_txn_lt = big_lt;
n_big_txn_lt = n_big;
#endif
dict_id = { next_dict_id }; next_dict_id++; dict_id = { next_dict_id }; next_dict_id++;
locktree *small_lt = mgr.get_lt(dict_id, nullptr, compare_dbts, nullptr); locktree *small_lt = mgr.get_lt(dict_id, nullptr, compare_dbts, nullptr);
......
...@@ -118,7 +118,7 @@ static int locktree_write_lock(locktree *lt, TXNID txn_id, int64_t left_k, int64 ...@@ -118,7 +118,7 @@ static int locktree_write_lock(locktree *lt, TXNID txn_id, int64_t left_k, int64
return lt->acquire_write_lock(txn_id, &left, &right, nullptr, big_txn); return lt->acquire_write_lock(txn_id, &left, &right, nullptr, big_txn);
} }
static void run_big_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_id, int64_t start_i) { static void run_big_txn(locktree_manager *mgr UU(), locktree *lt, TXNID txn_id, int64_t start_i) {
fprintf(stderr, "%u run_big_txn %p %" PRIu64 " %" PRId64 "\n", toku_os_gettid(), lt, txn_id, start_i); fprintf(stderr, "%u run_big_txn %p %" PRIu64 " %" PRId64 "\n", toku_os_gettid(), lt, txn_id, start_i);
int64_t last_i = -1; int64_t last_i = -1;
for (int64_t i = start_i; !killed; i++) { for (int64_t i = start_i; !killed; i++) {
...@@ -141,7 +141,7 @@ static void run_big_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_id, ...@@ -141,7 +141,7 @@ static void run_big_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_id,
} }
struct arg { struct arg {
locktree::manager *mgr; locktree_manager *mgr;
locktree *lt; locktree *lt;
TXNID txn_id; TXNID txn_id;
int64_t start_i; int64_t start_i;
...@@ -158,7 +158,7 @@ static void e_callback(TXNID txnid, const locktree *lt, const range_buffer &buff ...@@ -158,7 +158,7 @@ static void e_callback(TXNID txnid, const locktree *lt, const range_buffer &buff
printf("%u %s %" PRIu64 " %p %d %p\n", toku_os_gettid(), __FUNCTION__, txnid, lt, buffer.get_num_ranges(), extra); printf("%u %s %" PRIu64 " %p %d %p\n", toku_os_gettid(), __FUNCTION__, txnid, lt, buffer.get_num_ranges(), extra);
} }
static uint64_t get_escalation_count(locktree::manager &mgr) { static uint64_t get_escalation_count(locktree_manager &mgr) {
LTM_STATUS_S ltm_status; LTM_STATUS_S ltm_status;
mgr.get_status(&ltm_status); mgr.get_status(&ltm_status);
...@@ -205,7 +205,7 @@ int main(int argc, const char *argv[]) { ...@@ -205,7 +205,7 @@ int main(int argc, const char *argv[]) {
int r; int r;
// create a manager // create a manager
locktree::manager mgr; locktree_manager mgr;
mgr.create(nullptr, nullptr, e_callback, nullptr); mgr.create(nullptr, nullptr, e_callback, nullptr);
mgr.set_max_lock_memory(max_lock_memory); mgr.set_max_lock_memory(max_lock_memory);
......
...@@ -118,7 +118,7 @@ static int locktree_write_lock(locktree *lt, TXNID txn_id, int64_t left_k, int64 ...@@ -118,7 +118,7 @@ static int locktree_write_lock(locktree *lt, TXNID txn_id, int64_t left_k, int64
return lt->acquire_write_lock(txn_id, &left, &right, nullptr, big_txn); return lt->acquire_write_lock(txn_id, &left, &right, nullptr, big_txn);
} }
static void run_big_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_id, int64_t start_i) { static void run_big_txn(locktree_manager *mgr UU(), locktree *lt, TXNID txn_id, int64_t start_i) {
fprintf(stderr, "%u run_big_txn %p %" PRIu64 " %" PRId64 "\n", toku_os_gettid(), lt, txn_id, start_i); fprintf(stderr, "%u run_big_txn %p %" PRIu64 " %" PRId64 "\n", toku_os_gettid(), lt, txn_id, start_i);
int64_t last_i = -1; int64_t last_i = -1;
for (int64_t i = start_i; !killed; i++) { for (int64_t i = start_i; !killed; i++) {
...@@ -141,7 +141,7 @@ static void run_big_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_id, ...@@ -141,7 +141,7 @@ static void run_big_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_id,
} }
struct arg { struct arg {
locktree::manager *mgr; locktree_manager *mgr;
locktree *lt; locktree *lt;
TXNID txn_id; TXNID txn_id;
int64_t start_i; int64_t start_i;
...@@ -158,7 +158,7 @@ static void e_callback(TXNID txnid, const locktree *lt, const range_buffer &buff ...@@ -158,7 +158,7 @@ static void e_callback(TXNID txnid, const locktree *lt, const range_buffer &buff
printf("%u %s %" PRIu64 " %p %d %p\n", toku_os_gettid(), __FUNCTION__, txnid, lt, buffer.get_num_ranges(), extra); printf("%u %s %" PRIu64 " %p %d %p\n", toku_os_gettid(), __FUNCTION__, txnid, lt, buffer.get_num_ranges(), extra);
} }
static uint64_t get_escalation_count(locktree::manager &mgr) { static uint64_t get_escalation_count(locktree_manager &mgr) {
LTM_STATUS_S ltm_status; LTM_STATUS_S ltm_status;
mgr.get_status(&ltm_status); mgr.get_status(&ltm_status);
...@@ -205,7 +205,7 @@ int main(int argc, const char *argv[]) { ...@@ -205,7 +205,7 @@ int main(int argc, const char *argv[]) {
int r; int r;
// create a manager // create a manager
locktree::manager mgr; locktree_manager mgr;
mgr.create(nullptr, nullptr, e_callback, nullptr); mgr.create(nullptr, nullptr, e_callback, nullptr);
mgr.set_max_lock_memory(max_lock_memory); mgr.set_max_lock_memory(max_lock_memory);
......
...@@ -123,7 +123,7 @@ static void e_callback(TXNID txnid, const locktree *lt, const range_buffer &buff ...@@ -123,7 +123,7 @@ static void e_callback(TXNID txnid, const locktree *lt, const range_buffer &buff
printf("%u %s %" PRIu64 " %p %d %p\n", toku_os_gettid(), __FUNCTION__, txnid, lt, buffer.get_num_ranges(), extra); printf("%u %s %" PRIu64 " %p %d %p\n", toku_os_gettid(), __FUNCTION__, txnid, lt, buffer.get_num_ranges(), extra);
} }
static uint64_t get_escalation_count(locktree::manager &mgr) { static uint64_t get_escalation_count(locktree_manager &mgr) {
LTM_STATUS_S ltm_status; LTM_STATUS_S ltm_status;
mgr.get_status(&ltm_status); mgr.get_status(&ltm_status);
...@@ -159,7 +159,7 @@ int main(int argc, const char *argv[]) { ...@@ -159,7 +159,7 @@ int main(int argc, const char *argv[]) {
int r; int r;
// create a manager // create a manager
locktree::manager mgr; locktree_manager mgr;
mgr.create(nullptr, nullptr, e_callback, nullptr); mgr.create(nullptr, nullptr, e_callback, nullptr);
mgr.set_max_lock_memory(max_lock_memory); mgr.set_max_lock_memory(max_lock_memory);
......
...@@ -126,7 +126,7 @@ static int locktree_write_lock(locktree *lt, TXNID txn_id, int64_t left_k, int64 ...@@ -126,7 +126,7 @@ static int locktree_write_lock(locktree *lt, TXNID txn_id, int64_t left_k, int64
return lt->acquire_write_lock(txn_id, &left, &right, nullptr, big_txn); return lt->acquire_write_lock(txn_id, &left, &right, nullptr, big_txn);
} }
static void run_big_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_id) { static void run_big_txn(locktree_manager *mgr UU(), locktree *lt, TXNID txn_id) {
int64_t last_i = -1; int64_t last_i = -1;
for (int64_t i = 0; !killed; i++) { for (int64_t i = 0; !killed; i++) {
uint64_t t_start = toku_current_time_microsec(); uint64_t t_start = toku_current_time_microsec();
...@@ -144,7 +144,7 @@ static void run_big_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_id) ...@@ -144,7 +144,7 @@ static void run_big_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_id)
locktree_release_lock(lt, txn_id, 0, last_i); // release the range 0 .. last_i locktree_release_lock(lt, txn_id, 0, last_i); // release the range 0 .. last_i
} }
static void run_small_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_id, int64_t k) { static void run_small_txn(locktree_manager *mgr UU(), locktree *lt, TXNID txn_id, int64_t k) {
for (int64_t i = 0; !killed; i++) { for (int64_t i = 0; !killed; i++) {
uint64_t t_start = toku_current_time_microsec(); uint64_t t_start = toku_current_time_microsec();
int r = locktree_write_lock(lt, txn_id, k, k, false); int r = locktree_write_lock(lt, txn_id, k, k, false);
...@@ -160,7 +160,7 @@ static void run_small_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_i ...@@ -160,7 +160,7 @@ static void run_small_txn(locktree::manager *mgr UU(), locktree *lt, TXNID txn_i
} }
struct arg { struct arg {
locktree::manager *mgr; locktree_manager *mgr;
locktree *lt; locktree *lt;
TXNID txn_id; TXNID txn_id;
int64_t k; int64_t k;
...@@ -183,7 +183,7 @@ static void e_callback(TXNID txnid, const locktree *lt, const range_buffer &buff ...@@ -183,7 +183,7 @@ static void e_callback(TXNID txnid, const locktree *lt, const range_buffer &buff
printf("%u %s %" PRIu64 " %p %d %p\n", toku_os_gettid(), __FUNCTION__, txnid, lt, buffer.get_num_ranges(), extra); printf("%u %s %" PRIu64 " %p %d %p\n", toku_os_gettid(), __FUNCTION__, txnid, lt, buffer.get_num_ranges(), extra);
} }
static uint64_t get_escalation_count(locktree::manager &mgr) { static uint64_t get_escalation_count(locktree_manager &mgr) {
LTM_STATUS_S ltm_status; LTM_STATUS_S ltm_status;
mgr.get_status(&ltm_status); mgr.get_status(&ltm_status);
...@@ -223,7 +223,7 @@ int main(int argc, const char *argv[]) { ...@@ -223,7 +223,7 @@ int main(int argc, const char *argv[]) {
int r; int r;
// create a manager // create a manager
locktree::manager mgr; locktree_manager mgr;
mgr.create(nullptr, nullptr, e_callback, nullptr); mgr.create(nullptr, nullptr, e_callback, nullptr);
mgr.set_max_lock_memory(max_lock_memory); mgr.set_max_lock_memory(max_lock_memory);
......
...@@ -95,11 +95,10 @@ namespace toku { ...@@ -95,11 +95,10 @@ namespace toku {
// test that ranges with infinite endpoints work // test that ranges with infinite endpoints work
void locktree_unit_test::test_infinity(void) { void locktree_unit_test::test_infinity(void) {
locktree::manager mgr; locktree lt;
mgr.create(nullptr, nullptr, nullptr, nullptr);
DESCRIPTOR desc = nullptr;
DICTIONARY_ID dict_id = { 1 }; DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, desc, compare_dbts, nullptr); lt.create(nullptr, dict_id, nullptr, compare_dbts);
int r; int r;
TXNID txnid_a = 1001; TXNID txnid_a = 1001;
...@@ -112,60 +111,57 @@ void locktree_unit_test::test_infinity(void) { ...@@ -112,60 +111,57 @@ void locktree_unit_test::test_infinity(void) {
const DBT max_int = max_dbt(); const DBT max_int = max_dbt();
// txn A will lock -inf, 5. // txn A will lock -inf, 5.
r = lt->acquire_write_lock(txnid_a, toku_dbt_negative_infinity(), five, nullptr, false); r = lt.acquire_write_lock(txnid_a, toku_dbt_negative_infinity(), five, nullptr, false);
invariant(r == 0); invariant(r == 0);
// txn B will fail to get any lock <= 5, even min_int // txn B will fail to get any lock <= 5, even min_int
r = lt->acquire_write_lock(txnid_b, five, five, nullptr, false); r = lt.acquire_write_lock(txnid_b, five, five, nullptr, false);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
r = lt->acquire_write_lock(txnid_b, zero, one, nullptr, false); r = lt.acquire_write_lock(txnid_b, zero, one, nullptr, false);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
r = lt->acquire_write_lock(txnid_b, &min_int, &min_int, nullptr, false); r = lt.acquire_write_lock(txnid_b, &min_int, &min_int, nullptr, false);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
r = lt->acquire_write_lock(txnid_b, toku_dbt_negative_infinity(), &min_int, nullptr, false); r = lt.acquire_write_lock(txnid_b, toku_dbt_negative_infinity(), &min_int, nullptr, false);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
lt->remove_overlapping_locks_for_txnid(txnid_a, toku_dbt_negative_infinity(), five); lt.remove_overlapping_locks_for_txnid(txnid_a, toku_dbt_negative_infinity(), five);
// txn A will lock 1, +inf // txn A will lock 1, +inf
r = lt->acquire_write_lock(txnid_a, one, toku_dbt_positive_infinity(), nullptr, false); r = lt.acquire_write_lock(txnid_a, one, toku_dbt_positive_infinity(), nullptr, false);
invariant(r == 0); invariant(r == 0);
// txn B will fail to get any lock >= 1, even max_int // txn B will fail to get any lock >= 1, even max_int
r = lt->acquire_write_lock(txnid_b, one, one, nullptr, false); r = lt.acquire_write_lock(txnid_b, one, one, nullptr, false);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
r = lt->acquire_write_lock(txnid_b, two, five, nullptr, false); r = lt.acquire_write_lock(txnid_b, two, five, nullptr, false);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
r = lt->acquire_write_lock(txnid_b, &max_int, &max_int, nullptr, false); r = lt.acquire_write_lock(txnid_b, &max_int, &max_int, nullptr, false);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
r = lt->acquire_write_lock(txnid_b, &max_int, toku_dbt_positive_infinity(), nullptr, false); r = lt.acquire_write_lock(txnid_b, &max_int, toku_dbt_positive_infinity(), nullptr, false);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
lt->remove_overlapping_locks_for_txnid(txnid_a, toku_dbt_negative_infinity(), five); lt.remove_overlapping_locks_for_txnid(txnid_a, toku_dbt_negative_infinity(), five);
// txn A will lock -inf, +inf // txn A will lock -inf, +inf
r = lt->acquire_write_lock(txnid_a, toku_dbt_negative_infinity(), toku_dbt_positive_infinity(), nullptr, false); r = lt.acquire_write_lock(txnid_a, toku_dbt_negative_infinity(), toku_dbt_positive_infinity(), nullptr, false);
invariant(r == 0); invariant(r == 0);
// txn B will fail to get any lock // txn B will fail to get any lock
r = lt->acquire_write_lock(txnid_b, zero, one, nullptr, false); r = lt.acquire_write_lock(txnid_b, zero, one, nullptr, false);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
r = lt->acquire_write_lock(txnid_b, two, five, nullptr, false); r = lt.acquire_write_lock(txnid_b, two, five, nullptr, false);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
r = lt->acquire_write_lock(txnid_b, &min_int, &min_int, nullptr, false); r = lt.acquire_write_lock(txnid_b, &min_int, &min_int, nullptr, false);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
r = lt->acquire_write_lock(txnid_b, &min_int, &max_int, nullptr, false); r = lt.acquire_write_lock(txnid_b, &min_int, &max_int, nullptr, false);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
r = lt->acquire_write_lock(txnid_b, &max_int, &max_int, nullptr, false); r = lt.acquire_write_lock(txnid_b, &max_int, &max_int, nullptr, false);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
r = lt->acquire_write_lock(txnid_b, toku_dbt_negative_infinity(), toku_dbt_negative_infinity(), nullptr, false); r = lt.acquire_write_lock(txnid_b, toku_dbt_negative_infinity(), toku_dbt_negative_infinity(), nullptr, false);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
r = lt->acquire_write_lock(txnid_b, toku_dbt_negative_infinity(), toku_dbt_positive_infinity(), nullptr, false); r = lt.acquire_write_lock(txnid_b, toku_dbt_negative_infinity(), toku_dbt_positive_infinity(), nullptr, false);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
r = lt->acquire_write_lock(txnid_b, toku_dbt_positive_infinity(), toku_dbt_positive_infinity(), nullptr, false); r = lt.acquire_write_lock(txnid_b, toku_dbt_positive_infinity(), toku_dbt_positive_infinity(), nullptr, false);
invariant(r == DB_LOCK_NOTGRANTED); invariant(r == DB_LOCK_NOTGRANTED);
lt->remove_overlapping_locks_for_txnid(txnid_a, toku_dbt_negative_infinity(), toku_dbt_positive_infinity()); lt.remove_overlapping_locks_for_txnid(txnid_a, toku_dbt_negative_infinity(), toku_dbt_positive_infinity());
mgr.release_lt(lt);
mgr.destroy();
} }
} /* namespace toku */ } /* namespace toku */
......
...@@ -107,18 +107,16 @@ static int my_compare_dbts(DB *db, const DBT *a, const DBT *b) { ...@@ -107,18 +107,16 @@ static int my_compare_dbts(DB *db, const DBT *a, const DBT *b) {
// test that get/set userdata works, and that get_manager() works // test that get/set userdata works, and that get_manager() works
void locktree_unit_test::test_misc(void) { void locktree_unit_test::test_misc(void) {
locktree::manager mgr; locktree lt;
mgr.create(nullptr, nullptr, nullptr, nullptr);
DESCRIPTOR desc = nullptr;
DICTIONARY_ID dict_id = { 1 }; DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, desc, my_compare_dbts, nullptr); lt.create(nullptr, dict_id, nullptr, my_compare_dbts);
invariant(lt->get_userdata() == nullptr); invariant(lt.get_userdata() == nullptr);
int userdata; int userdata;
lt->set_userdata(&userdata); lt.set_userdata(&userdata);
invariant(lt->get_userdata() == &userdata); invariant(lt.get_userdata() == &userdata);
lt->set_userdata(nullptr); lt.set_userdata(nullptr);
invariant(lt->get_userdata() == nullptr); invariant(lt.get_userdata() == nullptr);
int r; int r;
DBT dbt_a, dbt_b; DBT dbt_a, dbt_b;
...@@ -128,17 +126,14 @@ void locktree_unit_test::test_misc(void) { ...@@ -128,17 +126,14 @@ void locktree_unit_test::test_misc(void) {
// make sure the comparator object has the correct // make sure the comparator object has the correct
// descriptor when we set the locktree's descriptor // descriptor when we set the locktree's descriptor
lt->set_descriptor(&d1); lt.set_descriptor(&d1);
expected_descriptor = &d1; expected_descriptor = &d1;
r = lt->m_cmp->compare(&dbt_a, &dbt_b); r = lt.m_cmp->compare(&dbt_a, &dbt_b);
invariant(r == expected_comparison_magic); invariant(r == expected_comparison_magic);
lt->set_descriptor(&d2); lt.set_descriptor(&d2);
expected_descriptor = &d2; expected_descriptor = &d2;
r = lt->m_cmp->compare(&dbt_a, &dbt_b); r = lt.m_cmp->compare(&dbt_a, &dbt_b);
invariant(r == expected_comparison_magic); invariant(r == expected_comparison_magic);
mgr.release_lt(lt);
mgr.destroy();
} }
} /* namespace toku */ } /* namespace toku */
......
...@@ -98,11 +98,10 @@ namespace toku { ...@@ -98,11 +98,10 @@ namespace toku {
// write locks if overlapping and ensure that existing read // write locks if overlapping and ensure that existing read
// or write locks are consolidated by overlapping relocks. // or write locks are consolidated by overlapping relocks.
void locktree_unit_test::test_overlapping_relock(void) { void locktree_unit_test::test_overlapping_relock(void) {
locktree::manager mgr; locktree lt;
mgr.create(nullptr, nullptr, nullptr, nullptr);
DESCRIPTOR desc = nullptr;
DICTIONARY_ID dict_id = { 1 }; DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, desc, compare_dbts, nullptr); lt.create(nullptr, dict_id, nullptr, compare_dbts);
const DBT *zero = get_dbt(0); const DBT *zero = get_dbt(0);
const DBT *one = get_dbt(1); const DBT *one = get_dbt(1);
...@@ -121,15 +120,15 @@ void locktree_unit_test::test_overlapping_relock(void) { ...@@ -121,15 +120,15 @@ void locktree_unit_test::test_overlapping_relock(void) {
// do something. at the end of the test, we release 100, 100. // do something. at the end of the test, we release 100, 100.
const TXNID the_other_txnid = 9999; const TXNID the_other_txnid = 9999;
const DBT *hundred = get_dbt(100); const DBT *hundred = get_dbt(100);
r = lt->acquire_write_lock(the_other_txnid, hundred, hundred, nullptr, false); r = lt.acquire_write_lock(the_other_txnid, hundred, hundred, nullptr, false);
invariant(r == 0); invariant(r == 0);
for (int test_run = 0; test_run < 2; test_run++) { for (int test_run = 0; test_run < 2; test_run++) {
// test_run == 0 means test with read lock // test_run == 0 means test with read lock
// test_run == 1 means test with write lock // test_run == 1 means test with write lock
#define ACQUIRE_LOCK(txn, left, right, conflicts) \ #define ACQUIRE_LOCK(txn, left, right, conflicts) \
test_run == 0 ? lt->acquire_read_lock(txn, left, right, conflicts, false) \ test_run == 0 ? lt.acquire_read_lock(txn, left, right, conflicts, false) \
: lt->acquire_write_lock(txn, left, right, conflicts, false) : lt.acquire_write_lock(txn, left, right, conflicts, false)
// lock [1,1] and [2,2]. then lock [1,2]. // lock [1,1] and [2,2]. then lock [1,2].
// ensure only [1,2] exists in the tree // ensure only [1,2] exists in the tree
...@@ -157,10 +156,10 @@ void locktree_unit_test::test_overlapping_relock(void) { ...@@ -157,10 +156,10 @@ void locktree_unit_test::test_overlapping_relock(void) {
return true; return true;
} }
} verify_fn; } verify_fn;
verify_fn.cmp = lt->m_cmp; verify_fn.cmp = lt.m_cmp;
#define do_verify() \ #define do_verify() \
do { verify_fn.saw_the_other = false; locktree_iterate<verify_fn_obj>(lt, &verify_fn); } while (0) do { verify_fn.saw_the_other = false; locktree_iterate<verify_fn_obj>(&lt, &verify_fn); } while (0)
keyrange range; keyrange range;
range.create(one, two); range.create(one, two);
...@@ -170,9 +169,9 @@ void locktree_unit_test::test_overlapping_relock(void) { ...@@ -170,9 +169,9 @@ void locktree_unit_test::test_overlapping_relock(void) {
// unlocking [1,1] should remove the only range, // unlocking [1,1] should remove the only range,
// the other unlocks shoudl do nothing. // the other unlocks shoudl do nothing.
lt->remove_overlapping_locks_for_txnid(txnid_a, one, one); lt.remove_overlapping_locks_for_txnid(txnid_a, one, one);
lt->remove_overlapping_locks_for_txnid(txnid_a, two, two); lt.remove_overlapping_locks_for_txnid(txnid_a, two, two);
lt->remove_overlapping_locks_for_txnid(txnid_a, one, two); lt.remove_overlapping_locks_for_txnid(txnid_a, one, two);
// try overlapping from the right // try overlapping from the right
r = ACQUIRE_LOCK(txnid_a, one, three, nullptr); r = ACQUIRE_LOCK(txnid_a, one, three, nullptr);
...@@ -197,16 +196,13 @@ void locktree_unit_test::test_overlapping_relock(void) { ...@@ -197,16 +196,13 @@ void locktree_unit_test::test_overlapping_relock(void) {
do_verify(); do_verify();
// release one of the locks we acquired. this should clean up the whole range. // release one of the locks we acquired. this should clean up the whole range.
lt->remove_overlapping_locks_for_txnid(txnid_a, zero, four); lt.remove_overlapping_locks_for_txnid(txnid_a, zero, four);
#undef ACQUIRE_LOCK #undef ACQUIRE_LOCK
} }
// remove the other txnid's lock now // remove the other txnid's lock now
lt->remove_overlapping_locks_for_txnid(the_other_txnid, hundred, hundred); lt.remove_overlapping_locks_for_txnid(the_other_txnid, hundred, hundred);
mgr.release_lt(lt);
mgr.destroy();
} }
} /* namespace toku */ } /* namespace toku */
......
...@@ -95,11 +95,11 @@ namespace toku { ...@@ -95,11 +95,11 @@ namespace toku {
// test simple, non-overlapping read locks and then write locks // test simple, non-overlapping read locks and then write locks
void locktree_unit_test::test_simple_lock(void) { void locktree_unit_test::test_simple_lock(void) {
locktree::manager mgr; locktree_manager mgr;
mgr.create(nullptr, nullptr, nullptr, nullptr); mgr.create(nullptr, nullptr, nullptr, nullptr);
DESCRIPTOR desc = nullptr;
DICTIONARY_ID dict_id = { 1 }; DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, desc, compare_dbts, nullptr); locktree *lt = mgr.get_lt(dict_id, nullptr, compare_dbts, nullptr);
int r; int r;
TXNID txnid_a = 1001; TXNID txnid_a = 1001;
......
...@@ -98,11 +98,10 @@ namespace toku { ...@@ -98,11 +98,10 @@ namespace toku {
// write locks if overlapping and ensure that existing read // write locks if overlapping and ensure that existing read
// or write locks are consolidated by overlapping relocks. // or write locks are consolidated by overlapping relocks.
void locktree_unit_test::test_single_txnid_optimization(void) { void locktree_unit_test::test_single_txnid_optimization(void) {
locktree::manager mgr; locktree lt;
mgr.create(nullptr, nullptr, nullptr, nullptr);
DESCRIPTOR desc = nullptr;
DICTIONARY_ID dict_id = { 1 }; DICTIONARY_ID dict_id = { 1 };
locktree *lt = mgr.get_lt(dict_id, desc, compare_dbts, nullptr); lt.create(nullptr, dict_id, nullptr, compare_dbts);
const DBT *zero = get_dbt(0); const DBT *zero = get_dbt(0);
const DBT *one = get_dbt(1); const DBT *one = get_dbt(1);
...@@ -124,13 +123,13 @@ void locktree_unit_test::test_single_txnid_optimization(void) { ...@@ -124,13 +123,13 @@ void locktree_unit_test::test_single_txnid_optimization(void) {
buffer.create(); buffer.create();
#define lock_and_append_point_for_txnid_a(key) \ #define lock_and_append_point_for_txnid_a(key) \
r = lt->acquire_write_lock(txnid_a, key, key, nullptr, false); \ r = lt.acquire_write_lock(txnid_a, key, key, nullptr, false); \
invariant_zero(r); \ invariant_zero(r); \
buffer.append(key, key); buffer.append(key, key);
#define maybe_point_locks_for_txnid_b(i) \ #define maybe_point_locks_for_txnid_b(i) \
if (where == i) { \ if (where == i) { \
r = lt->acquire_write_lock(txnid_b, one, one, nullptr, false); \ r = lt.acquire_write_lock(txnid_b, one, one, nullptr, false); \
invariant_zero(r); \ invariant_zero(r); \
} }
...@@ -143,7 +142,7 @@ void locktree_unit_test::test_single_txnid_optimization(void) { ...@@ -143,7 +142,7 @@ void locktree_unit_test::test_single_txnid_optimization(void) {
lock_and_append_point_for_txnid_a(zero); lock_and_append_point_for_txnid_a(zero);
maybe_point_locks_for_txnid_b(2); maybe_point_locks_for_txnid_b(2);
lt->release_locks(txnid_a, &buffer); lt.release_locks(txnid_a, &buffer);
// txnid b does not take a lock on iteration 3 // txnid b does not take a lock on iteration 3
if (where != 3) { if (where != 3) {
...@@ -158,21 +157,18 @@ void locktree_unit_test::test_single_txnid_optimization(void) { ...@@ -158,21 +157,18 @@ void locktree_unit_test::test_single_txnid_optimization(void) {
return true; return true;
} }
} verify_fn; } verify_fn;
verify_fn.cmp = lt->m_cmp; verify_fn.cmp = lt.m_cmp;
keyrange range; keyrange range;
range.create(one, one); range.create(one, one);
verify_fn.expected_txnid = txnid_b; verify_fn.expected_txnid = txnid_b;
verify_fn.expected_range = &range; verify_fn.expected_range = &range;
locktree_iterate<verify_fn_obj>(lt, &verify_fn); locktree_iterate<verify_fn_obj>(&lt, &verify_fn);
lt->remove_overlapping_locks_for_txnid(txnid_b, one, one); lt.remove_overlapping_locks_for_txnid(txnid_b, one, one);
} }
buffer.destroy(); buffer.destroy();
} }
mgr.release_lt(lt);
mgr.destroy();
} }
} /* namespace toku */ } /* namespace toku */
......
...@@ -94,17 +94,14 @@ PATENT RIGHTS GRANT: ...@@ -94,17 +94,14 @@ PATENT RIGHTS GRANT:
namespace toku { namespace toku {
void manager_unit_test::test_create_destroy(void) { void manager_unit_test::test_create_destroy(void) {
locktree::manager mgr; locktree_manager mgr;
locktree::manager::lt_create_cb create_callback = lt_create_cb create_callback = (lt_create_cb) (long) 1;
(locktree::manager::lt_create_cb) (long) 1; lt_destroy_cb destroy_callback = (lt_destroy_cb) (long) 2;
locktree::manager::lt_destroy_cb destroy_callback = lt_escalate_cb escalate_callback = (lt_escalate_cb) (long) 3;
(locktree::manager::lt_destroy_cb) (long) 2;
locktree::manager::lt_escalate_cb escalate_callback =
(locktree::manager::lt_escalate_cb) (long) 3;
void *extra = (void *) (long) 4; void *extra = (void *) (long) 4;
mgr.create(create_callback, destroy_callback, escalate_callback, extra); mgr.create(create_callback, destroy_callback, escalate_callback, extra);
invariant(mgr.m_max_lock_memory == locktree::manager::DEFAULT_MAX_LOCK_MEMORY); invariant(mgr.m_max_lock_memory == locktree_manager::DEFAULT_MAX_LOCK_MEMORY);
invariant(mgr.m_current_lock_memory == 0); invariant(mgr.m_current_lock_memory == 0);
invariant(mgr.m_escalation_count == 0); invariant(mgr.m_escalation_count == 0);
invariant(mgr.m_escalation_time == 0); invariant(mgr.m_escalation_time == 0);
......
...@@ -94,7 +94,7 @@ PATENT RIGHTS GRANT: ...@@ -94,7 +94,7 @@ PATENT RIGHTS GRANT:
namespace toku { namespace toku {
void manager_unit_test::test_lt_map(void) { void manager_unit_test::test_lt_map(void) {
locktree::manager mgr; locktree_manager mgr;
mgr.create(nullptr, nullptr, nullptr, nullptr); mgr.create(nullptr, nullptr, nullptr, nullptr);
locktree aa; locktree aa;
......
...@@ -95,7 +95,7 @@ namespace toku { ...@@ -95,7 +95,7 @@ namespace toku {
void manager_unit_test::test_params(void) { void manager_unit_test::test_params(void) {
int r; int r;
locktree::manager mgr; locktree_manager mgr;
mgr.create(nullptr, nullptr, nullptr, nullptr); mgr.create(nullptr, nullptr, nullptr, nullptr);
uint64_t new_max_lock_memory = 15307752356; uint64_t new_max_lock_memory = 15307752356;
......
...@@ -108,7 +108,7 @@ static void destroy_cb(locktree *lt) { ...@@ -108,7 +108,7 @@ static void destroy_cb(locktree *lt) {
} }
void manager_unit_test::test_reference_release_lt(void) { void manager_unit_test::test_reference_release_lt(void) {
locktree::manager mgr; locktree_manager mgr;
mgr.create(create_cb, destroy_cb, nullptr, nullptr); mgr.create(create_cb, destroy_cb, nullptr, nullptr);
DICTIONARY_ID a = { 0 }; DICTIONARY_ID a = { 0 };
......
...@@ -112,8 +112,7 @@ static void assert_status(LTM_STATUS ltm_status, const char *keyname, uint64_t v ...@@ -112,8 +112,7 @@ static void assert_status(LTM_STATUS ltm_status, const char *keyname, uint64_t v
} }
void manager_unit_test::test_status(void) { void manager_unit_test::test_status(void) {
locktree_manager mgr;
locktree::manager mgr;
mgr.create(nullptr, nullptr, nullptr, nullptr); mgr.create(nullptr, nullptr, nullptr, nullptr);
LTM_STATUS_S status; LTM_STATUS_S status;
......
...@@ -150,7 +150,7 @@ struct __toku_db_env_internal { ...@@ -150,7 +150,7 @@ struct __toku_db_env_internal {
unsigned long cachetable_size; unsigned long cachetable_size;
CACHETABLE cachetable; CACHETABLE cachetable;
TOKULOGGER logger; TOKULOGGER logger;
toku::locktree::manager ltm; toku::locktree_manager ltm;
lock_timeout_callback lock_wait_timeout_callback; // Called when a lock request times out waiting for a lock. lock_timeout_callback lock_wait_timeout_callback; // Called when a lock request times out waiting for a lock.
DB *directory; // Maps dnames to inames DB *directory; // Maps dnames to inames
...@@ -189,7 +189,7 @@ struct __toku_db_env_internal { ...@@ -189,7 +189,7 @@ struct __toku_db_env_internal {
// test-only environment function for running lock escalation // test-only environment function for running lock escalation
static inline void toku_env_run_lock_escalation_for_test(DB_ENV *env) { static inline void toku_env_run_lock_escalation_for_test(DB_ENV *env) {
toku::locktree::manager *mgr = &env->i->ltm; toku::locktree_manager *mgr = &env->i->ltm;
mgr->run_escalation_for_test(); mgr->run_escalation_for_test();
} }
......
...@@ -2348,7 +2348,7 @@ env_iterate_pending_lock_requests(DB_ENV *env, ...@@ -2348,7 +2348,7 @@ env_iterate_pending_lock_requests(DB_ENV *env,
return EINVAL; return EINVAL;
} }
toku::locktree::manager *mgr = &env->i->ltm; toku::locktree_manager *mgr = &env->i->ltm;
ltm_iterate_requests_callback_extra e(env, callback, extra); ltm_iterate_requests_callback_extra e(env, callback, extra);
return mgr->iterate_pending_lock_requests(ltm_iterate_requests_callback, &e); return mgr->iterate_pending_lock_requests(ltm_iterate_requests_callback, &e);
} }
......
...@@ -137,7 +137,7 @@ static void db_txn_note_row_lock(DB *db, DB_TXN *txn, const DBT *left_key, const ...@@ -137,7 +137,7 @@ static void db_txn_note_row_lock(DB *db, DB_TXN *txn, const DBT *left_key, const
map->insert_at(ranges, idx); map->insert_at(ranges, idx);
// let the manager know we're referencing this lt // let the manager know we're referencing this lt
toku::locktree::manager *ltm = &txn->mgrp->i->ltm; toku::locktree_manager *ltm = &txn->mgrp->i->ltm;
ltm->reference_lt(ranges.lt); ltm->reference_lt(ranges.lt);
} else { } else {
invariant_zero(r); invariant_zero(r);
...@@ -148,7 +148,7 @@ static void db_txn_note_row_lock(DB *db, DB_TXN *txn, const DBT *left_key, const ...@@ -148,7 +148,7 @@ static void db_txn_note_row_lock(DB *db, DB_TXN *txn, const DBT *left_key, const
ranges.buffer->append(left_key, right_key); ranges.buffer->append(left_key, right_key);
size_t new_num_bytes = ranges.buffer->get_num_bytes(); size_t new_num_bytes = ranges.buffer->get_num_bytes();
invariant(new_num_bytes > old_num_bytes); invariant(new_num_bytes > old_num_bytes);
lt->get_mem_tracker()->note_mem_used(new_num_bytes - old_num_bytes); lt->get_manager()->note_mem_used(new_num_bytes - old_num_bytes);
toku_mutex_unlock(&db_txn_struct_i(txn)->txn_mutex); toku_mutex_unlock(&db_txn_struct_i(txn)->txn_mutex);
} }
...@@ -201,7 +201,7 @@ void toku_db_txn_escalate_callback(TXNID txnid, const toku::locktree *lt, const ...@@ -201,7 +201,7 @@ void toku_db_txn_escalate_callback(TXNID txnid, const toku::locktree *lt, const
// //
// We could theoretically steal the memory from the caller instead of copying // We could theoretically steal the memory from the caller instead of copying
// it, but it's simpler to have a callback API that doesn't transfer memory ownership. // it, but it's simpler to have a callback API that doesn't transfer memory ownership.
lt->get_mem_tracker()->note_mem_released(ranges.buffer->get_num_bytes()); lt->get_manager()->note_mem_released(ranges.buffer->get_num_bytes());
ranges.buffer->destroy(); ranges.buffer->destroy();
ranges.buffer->create(); ranges.buffer->create();
toku::range_buffer::iterator iter; toku::range_buffer::iterator iter;
...@@ -211,7 +211,7 @@ void toku_db_txn_escalate_callback(TXNID txnid, const toku::locktree *lt, const ...@@ -211,7 +211,7 @@ void toku_db_txn_escalate_callback(TXNID txnid, const toku::locktree *lt, const
ranges.buffer->append(rec.get_left_key(), rec.get_right_key()); ranges.buffer->append(rec.get_left_key(), rec.get_right_key());
iter.next(); iter.next();
} }
lt->get_mem_tracker()->note_mem_used(ranges.buffer->get_num_bytes()); lt->get_manager()->note_mem_used(ranges.buffer->get_num_bytes());
} else { } else {
// In rare cases, we may not find the associated locktree, because we are // In rare cases, we may not find the associated locktree, because we are
// racing with the transaction trying to add this locktree to the lt map // racing with the transaction trying to add this locktree to the lt map
...@@ -315,7 +315,7 @@ void toku_db_release_lt_key_ranges(DB_TXN *txn, txn_lt_key_ranges *ranges) { ...@@ -315,7 +315,7 @@ void toku_db_release_lt_key_ranges(DB_TXN *txn, txn_lt_key_ranges *ranges) {
// release all of the locks this txn has ever successfully // release all of the locks this txn has ever successfully
// acquired and stored in the range buffer for this locktree // acquired and stored in the range buffer for this locktree
lt->release_locks(txnid, ranges->buffer); lt->release_locks(txnid, ranges->buffer);
lt->get_mem_tracker()->note_mem_released(ranges->buffer->get_num_bytes()); lt->get_manager()->note_mem_released(ranges->buffer->get_num_bytes());
ranges->buffer->destroy(); ranges->buffer->destroy();
toku_free(ranges->buffer); toku_free(ranges->buffer);
...@@ -324,6 +324,6 @@ void toku_db_release_lt_key_ranges(DB_TXN *txn, txn_lt_key_ranges *ranges) { ...@@ -324,6 +324,6 @@ void toku_db_release_lt_key_ranges(DB_TXN *txn, txn_lt_key_ranges *ranges) {
toku::lock_request::retry_all_lock_requests(lt); toku::lock_request::retry_all_lock_requests(lt);
// Release our reference on this locktree // Release our reference on this locktree
toku::locktree::manager *ltm = &txn->mgrp->i->ltm; toku::locktree_manager *ltm = &txn->mgrp->i->ltm;
ltm->release_lt(lt); ltm->release_lt(lt);
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment