Commit 48aa0d2f authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4920], [t:4953], separate ydb lock from hot indexing and checkpointing

git-svn-id: file:///svn/toku/tokudb@44202 c7de825b-a66e-492c-adef-691d508d4ae1
parent 6ccd2251
......@@ -20,8 +20,7 @@
* to set all the "pending" bits and to create the checkpoint-in-progress versions
* of the header and translation table (btt).
* The following operations must take the multi_operation_lock:
* - insertion into multiple indexes
* - "replace-into" (matching delete and insert on a single key)
* - any set of operations that must be atomic with respect to begin checkpoint
*
* - checkpoint_safe_lock
* This is a new reader-writer lock.
......@@ -35,13 +34,6 @@
* The application can use this lock to disable checkpointing during other sensitive
* operations, such as making a backup copy of the database.
*
* - ydb_big_lock
* This is the existing lock used to serialize all access to tokudb.
* This lock is held by the checkpoint function only for as long as is required to
* to set all the "pending" bits and to create the checkpoint-in-progress versions
* of the header and translation table (btt).
*
* Once the "pending" bits are set and the snapshots are taken of the header and btt,
* most normal database operations are permitted to resume.
*
......@@ -118,10 +110,6 @@ static LSN last_completed_checkpoint_lsn;
static toku_pthread_rwlock_t checkpoint_safe_lock;
static toku_pthread_rwlock_t multi_operation_lock;
// Call through function pointers because this layer has no access to ydb lock functions.
static void (*ydb_lock)(void) = NULL;
static void (*ydb_unlock)(void) = NULL;
static BOOL initialized = FALSE; // sanity check
static volatile BOOL locked_mo = FALSE; // true when the multi_operation write lock is held (by checkpoint)
static volatile BOOL locked_cs = FALSE; // true when the checkpoint_safe write lock is held (by checkpoint)
......@@ -220,9 +208,7 @@ toku_checkpoint_safe_client_unlock(void) {
// Initialize the checkpoint mechanism, must be called before any client operations.
void
toku_checkpoint_init(void (*ydb_lock_callback)(void), void (*ydb_unlock_callback)(void)) {
ydb_lock = ydb_lock_callback;
ydb_unlock = ydb_unlock_callback;
toku_checkpoint_init(void) {
multi_operation_lock_init();
checkpoint_safe_lock_init();
initialized = TRUE;
......@@ -280,7 +266,6 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger,
}
multi_operation_checkpoint_lock();
SET_CHECKPOINT_FOOTPRINT(20);
ydb_lock();
toku_ft_open_close_lock();
SET_CHECKPOINT_FOOTPRINT(30);
......@@ -289,7 +274,6 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger,
toku_ft_open_close_unlock();
multi_operation_checkpoint_unlock();
ydb_unlock();
SET_CHECKPOINT_FOOTPRINT(40);
if (r==0) {
......
......@@ -58,7 +58,7 @@ void toku_multi_operation_client_unlock(void);
// Initialize the checkpoint mechanism, must be called before any client operations.
// Must pass in function pointers to take/release ydb lock.
void toku_checkpoint_init(void (*ydb_lock_callback)(void), void (*ydb_unlock_callback)(void));
void toku_checkpoint_init(void);
void toku_checkpoint_destroy(void);
......
......@@ -5423,35 +5423,28 @@ int toku_dump_ft (FILE *f, FT_HANDLE brt) {
return r;
}
int toku_ft_layer_init(void (*ydb_lock_callback)(void),
void (*ydb_unlock_callback)(void)) {
int toku_ft_layer_init(void) {
int r = 0;
//Portability must be initialized first
r = toku_portability_init();
if (r) { goto exit; }
toku_checkpoint_init(ydb_lock_callback, ydb_unlock_callback);
toku_checkpoint_init();
r = toku_ft_serialize_layer_init();
if (r) { goto exit; }
toku_ft_serialize_layer_init();
toku_mutex_init(&ft_open_close_lock, NULL);
exit:
return r;
}
int toku_ft_layer_destroy(void) {
int r = 0;
void toku_ft_layer_destroy(void) {
toku_mutex_destroy(&ft_open_close_lock);
if (r == 0)
r = toku_ft_serialize_layer_destroy();
if (r==0)
toku_checkpoint_destroy();
toku_ft_serialize_layer_destroy();
toku_checkpoint_destroy();
//Portability must be cleaned up last
if (r==0)
r = toku_portability_destroy();
return r;
toku_portability_destroy();
}
void toku_ft_open_close_lock(void) {
......
......@@ -238,14 +238,12 @@ struct ftstat64_s {
int
toku_ft_handle_stat64 (FT_HANDLE, TOKUTXN, struct ftstat64_s *stat) __attribute__ ((warn_unused_result));
int toku_ft_layer_init(void (*ydb_lock_callback)(void),
void (*ydb_unlock_callback)(void))
__attribute__ ((warn_unused_result));
int toku_ft_layer_init(void) __attribute__ ((warn_unused_result));
void toku_ft_open_close_lock(void);
void toku_ft_open_close_unlock(void);
int toku_ft_layer_destroy(void) __attribute__ ((warn_unused_result));
int toku_ft_serialize_layer_init(void) __attribute__ ((warn_unused_result));
int toku_ft_serialize_layer_destroy(void) __attribute__ ((warn_unused_result));
void toku_ft_layer_destroy(void);
void toku_ft_serialize_layer_init(void);
void toku_ft_serialize_layer_destroy(void);
void toku_maybe_truncate_cachefile (CACHEFILE cf, int fd, u_int64_t size_used);
// Effect: truncate file if overallocated by at least 32MiB
......
......@@ -783,7 +783,7 @@ toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft_h, TOKU
// (old_ft_h may be one of many handles to the dictionary.)
// txn that created the loader
// Requires:
// ydb_lock is held.
// multi operation lock is held.
// The brt is open. (which implies there can be no zombies.)
// The new file must be a valid dictionary.
// The block size and flags in the new file must match the existing BRT.
......
......@@ -67,17 +67,15 @@ static inline void do_toku_trace(const char *cp, int len) {
static int num_cores = 0; // cache the number of cores for the parallelization
static struct toku_thread_pool *ft_pool = NULL;
int
void
toku_ft_serialize_layer_init(void) {
num_cores = toku_os_get_number_active_processors();
int r = toku_thread_pool_create(&ft_pool, num_cores); lazy_assert_zero(r);
return 0;
}
int
void
toku_ft_serialize_layer_destroy(void) {
toku_thread_pool_destroy(&ft_pool);
return 0;
}
enum {FILE_CHANGE_INCREMENT = (16<<20)};
......
......@@ -172,6 +172,7 @@ struct tokutxn {
// Protected by the txn manager lock:
TOKUTXN_STATE state;
struct toku_list prepared_txns_link; // list of prepared transactions
uint32_t num_pin;
};
static inline int
......
......@@ -14,19 +14,14 @@
static int recovery_main(int argc, const char *const argv[]);
static void dummy(void) {}
int
main(int argc, const char *const argv[]) {
{
int rr = toku_ft_layer_init(dummy, dummy);
int rr = toku_ft_layer_init();
assert(rr==0);
}
int r = recovery_main(argc, argv);
{
int rr = toku_ft_layer_destroy();
assert(rr==0);
}
toku_ft_layer_destroy();
return r;
}
......
......@@ -234,16 +234,13 @@ default_parse_args (int argc, const char *argv[]) {
int test_main(int argc, const char *argv[]);
static void dummy(void) {}
int
main(int argc, const char *argv[]) {
initialize_dummymsn();
int rinit = toku_ft_layer_init(dummy, dummy);
int rinit = toku_ft_layer_init();
CKERR(rinit);
int r = test_main(argc, argv);
int rdestroy = toku_ft_layer_destroy();
CKERR(rdestroy);
toku_ft_layer_destroy();
return r;
}
......
......@@ -170,7 +170,8 @@ toku_txn_create_txn (
.txnid64 = xid,
.ancestor_txnid64 = (parent_tokutxn ? parent_tokutxn->ancestor_txnid64 : xid),
.xids = xids,
.roll_info = roll_info
.roll_info = roll_info,
.num_pin = 0
};
......
......@@ -27,6 +27,8 @@ struct txn_manager {
time_t oldest_living_starttime; // timestamp in seconds of when txn with oldest_living_xid started
struct toku_list prepared_txns; // transactions that have been prepared and are unresolved, but have not been returned through txn_recover.
struct toku_list prepared_and_returned_txns; // transactions that have been prepared and unresolved, and have been returned through txn_recover. We need this list so that we can restart the recovery.
toku_cond_t wait_for_unpin_of_txn;
};
static TXN_MANAGER_STATUS_S txn_manager_status;
......@@ -201,6 +203,8 @@ void toku_txn_manager_init(TXN_MANAGER* txn_managerp) {
txn_manager->oldest_living_starttime = 0;
toku_list_init(&txn_manager->prepared_txns);
toku_list_init(&txn_manager->prepared_and_returned_txns);
toku_cond_init(&txn_manager->wait_for_unpin_of_txn, 0);
*txn_managerp = txn_manager;
}
......@@ -210,6 +214,7 @@ void toku_txn_manager_destroy(TXN_MANAGER txn_manager) {
toku_omt_destroy(&txn_manager->live_root_txns);
toku_omt_destroy(&txn_manager->snapshot_txnids);
toku_omt_destroy(&txn_manager->live_list_reverse);
toku_cond_destroy(&txn_manager->wait_for_unpin_of_txn);
toku_free(txn_manager);
}
......@@ -692,6 +697,16 @@ void toku_txn_manager_note_abort_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
invalidate_xa_xid(&txn->xa_xid);
toku_list_remove(&txn->prepared_txns_link);
}
// for hot indexing, if hot index is processing
// this transaction in some leafentry, then we cannot change
// the state to commit or abort until
// hot index is done with that leafentry
while (txn->num_pin > 0) {
toku_cond_wait(
&txn_manager->wait_for_unpin_of_txn,
&txn_manager->txn_manager_lock
);
}
txn->state = TOKUTXN_ABORTING;
toku_mutex_unlock(&txn_manager->txn_manager_lock);
}
......@@ -702,6 +717,16 @@ void toku_txn_manager_note_commit_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
invalidate_xa_xid(&txn->xa_xid);
toku_list_remove(&txn->prepared_txns_link);
}
// for hot indexing, if hot index is processing
// this transaction in some leafentry, then we cannot change
// the state to commit or abort until
// hot index is done with that leafentry
while (txn->num_pin > 0) {
toku_cond_wait(
&txn_manager->wait_for_unpin_of_txn,
&txn_manager->txn_manager_lock
);
}
txn->state = TOKUTXN_COMMITTING;
toku_mutex_unlock(&txn_manager->txn_manager_lock);
}
......@@ -749,6 +774,20 @@ exit:
}
// needed for hot indexing
void toku_txn_manager_pin_live_txn_unlocked(TXN_MANAGER UU(txn_manager), TOKUTXN txn) {
assert(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING);
txn->num_pin++;
}
void toku_txn_manager_unpin_live_txn_unlocked(TXN_MANAGER txn_manager, TOKUTXN txn) {
assert(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING);
assert(txn->num_pin > 0);
txn->num_pin--;
if (txn->num_pin == 0) {
toku_cond_broadcast(&txn_manager->wait_for_unpin_of_txn);
}
}
void toku_txn_manager_suspend(TXN_MANAGER txn_manager) {
toku_mutex_lock(&txn_manager->txn_manager_lock);
}
......
......@@ -78,6 +78,9 @@ int toku_txn_manager_recover_txn(
u_int32_t flags
);
void toku_txn_manager_pin_live_txn_unlocked(TXN_MANAGER txn_manager, TOKUTXN txn);
void toku_txn_manager_unpin_live_txn_unlocked(TXN_MANAGER txn_manager, TOKUTXN txn);
void toku_txn_manager_suspend(TXN_MANAGER txn_manager);
void toku_txn_manager_resume(TXN_MANAGER txn_manager);
......
......@@ -47,10 +47,9 @@ toku_portability_init(void) {
return r;
}
int
void
toku_portability_destroy(void) {
toku_memory_shutdown();
return 0;
}
int
......
......@@ -19,8 +19,7 @@ main(int argc, char *const argv[]) {
int ri = toku_portability_init();
assert(ri==0);
int r = test_main(argc, argv);
int rd = toku_portability_destroy();
assert(rd==0);
toku_portability_destroy();
return r;
}
......@@ -93,7 +93,7 @@ toku_ydb_lock(void) {
// Update status
STATUS_VALUE(YDB_LOCK_TAKEN)++;
if (new_num_waiters > STATUS_VALUE(YDB_MAX_WAITERS))
STATUS_VALUE(YDB_MAX_WAITERS) = new_num_waiters;
STATUS_VALUE(YDB_MAX_WAITERS) = new_num_waiters;
STATUS_VALUE(YDB_TOTAL_TIME_SINCE_START) = now - ydb_big_lock.starttime;
}
......@@ -105,7 +105,7 @@ ydb_unlock_internal(unsigned long useconds) {
tokutime_t time_held = now - ydb_big_lock.acquired_time;
STATUS_VALUE(YDB_TOTAL_TIME_YDB_LOCK_HELD) += time_held;
if (time_held > STATUS_VALUE(YDB_MAX_TIME_YDB_LOCK_HELD))
STATUS_VALUE(YDB_MAX_TIME_YDB_LOCK_HELD) = time_held;
STATUS_VALUE(YDB_MAX_TIME_YDB_LOCK_HELD) = time_held;
STATUS_VALUE(YDB_TOTAL_TIME_SINCE_START) = now - ydb_big_lock.starttime;
toku_mutex_unlock(&ydb_big_lock.lock);
......@@ -113,7 +113,7 @@ ydb_unlock_internal(unsigned long useconds) {
int new_num_waiters = __sync_add_and_fetch(&STATUS_VALUE(YDB_NUM_WAITERS_NOW), -1);
if (new_num_waiters > 0 && useconds > 0) {
__sync_add_and_fetch(&STATUS_VALUE(YDB_TOTAL_SLEEP_TIME), useconds);
__sync_add_and_fetch(&STATUS_VALUE(YDB_TOTAL_SLEEP_TIME), useconds);
usleep(useconds);
}
}
......
......@@ -12,6 +12,7 @@
#define TOKU_INDEXER_INTERNAL_H
#include <ft/txn_state.h>
#include <toku_pthread.h>
// the indexer_commit_keys is an ordered set of keys described by a DBT in the keys array.
// the array is a resizeable array with max size "max_keys" and current size "current_keys".
......@@ -24,7 +25,8 @@ struct indexer_commit_keys {
struct __toku_indexer_internal {
DB_ENV *env;
DB_TXN *txn;
DB_TXN *txn;
toku_mutex_t indexer_lock;
DB *src_db;
int N;
DB **dest_dbs; /* [N] */
......
This diff is collapsed.
......@@ -113,7 +113,8 @@ disassociate_indexer_from_hot_dbs(DB_INDEXER *indexer) {
static void
free_indexer_resources(DB_INDEXER *indexer) {
if ( indexer->i ) {
if ( indexer->i ) {
toku_mutex_destroy(&indexer->i->indexer_lock);
if ( indexer->i->lec ) { le_cursor_close(indexer->i->lec); }
if ( indexer->i->fnums ) {
toku_free(indexer->i->fnums);
......@@ -135,6 +136,16 @@ free_indexer(DB_INDEXER *indexer) {
}
}
void
toku_indexer_lock(DB_INDEXER* indexer) {
toku_mutex_lock(&indexer->i->indexer_lock);
}
void
toku_indexer_unlock(DB_INDEXER* indexer) {
toku_mutex_unlock(&indexer->i->indexer_lock);
}
int
toku_indexer_create_indexer(DB_ENV *env,
DB_TXN *txn,
......@@ -180,6 +191,8 @@ toku_indexer_create_indexer(DB_ENV *env,
indexer->close = close_indexer;
indexer->abort = abort_indexer;
toku_mutex_init(&indexer->i->indexer_lock, NULL);
//
// create and close a dummy loader to get redirection going for the hot indexer
// This way, if the hot index aborts, but other transactions have references to the
......@@ -273,7 +286,7 @@ build_index(DB_INDEXER *indexer) {
BOOL done = FALSE;
for (uint64_t loop_count = 0; !done; loop_count++) {
toku_ydb_lock();
toku_indexer_lock(indexer);
result = le_cursor_next(indexer->i->lec, &le);
if (result != 0) {
......@@ -295,10 +308,7 @@ build_index(DB_INDEXER *indexer) {
toku_ule_free(ule);
}
// if there is lock contention, then sleep for 1 millisecond after the unlock
// note: the value 1000 was empirically determined to provide good query performance
// during hotindexing
toku_ydb_unlock_and_yield(1000);
toku_indexer_unlock(indexer);
if (result == 0)
result = maybe_call_poll_func(indexer, loop_count);
......
......@@ -16,6 +16,12 @@ extern "C" {
#endif
// locking and unlocking functions to synchronize cursor position with
// XXX_multiple APIs
void toku_indexer_lock(DB_INDEXER* indexer);
void toku_indexer_unlock(DB_INDEXER* indexer);
// The indexer populates multiple destination db's from the contents of one source db.
// While the indexes are being built by the indexer, the application may continue to
// change the contents of the source db. The changes will be reflected into the destination
......
......@@ -9,6 +9,8 @@
#include <sys/stat.h>
#include "key-val.h"
toku_mutex_t put_lock;
enum {NUM_INDEXER_INDEXES=1};
static const int NUM_DBS = NUM_INDEXER_INDEXES + 1; // 1 for source DB
static const int NUM_ROWS = 100000;
......@@ -83,6 +85,7 @@ static void * client(void *arg)
dbt_init(&val, &v, sizeof(v));
while ( retry++ < 10 ) {
toku_mutex_lock(&put_lock);
rr = env->put_multiple(env,
cs->dbs[0],
txn,
......@@ -93,6 +96,7 @@ static void * client(void *arg)
dest_keys,
dest_vals,
cs->flags);
toku_mutex_unlock(&put_lock);
if ( rr == 0 ) break;
sleep(0);
}
......@@ -257,12 +261,14 @@ static void test_indexer(DB *src, DB **dbs)
CKERR(r);
if ( verbose ) printf("test_indexer create_indexer\n");
toku_mutex_lock(&put_lock);
r = env->create_indexer(env, txn, &indexer, src, NUM_DBS-1, &dbs[1], db_flags, 0);
CKERR(r);
r = indexer->set_error_callback(indexer, NULL, NULL);
CKERR(r);
r = indexer->set_poll_function(indexer, poll_print, NULL);
CKERR(r);
toku_mutex_unlock(&put_lock);
// start threads doing additional inserts - no lock issues since indexer already created
r = toku_pthread_create(&client_threads[0], 0, client, (void *)&client_specs[0]); CKERR(r);
......@@ -283,8 +289,10 @@ static void test_indexer(DB *src, DB **dbs)
}
if ( verbose ) printf("test_indexer close\n");
toku_mutex_lock(&put_lock);
r = indexer->close(indexer);
CKERR(r);
toku_mutex_unlock(&put_lock);
r = txn->commit(txn, DB_TXN_SYNC);
CKERR(r);
......@@ -306,6 +314,7 @@ static void test_indexer(DB *src, DB **dbs)
static void run_test(void)
{
int r;
toku_mutex_init(&put_lock, NULL);
r = system("rm -rf " ENVDIR); CKERR(r);
r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = toku_os_mkdir(ENVDIR "/log", S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
......@@ -350,6 +359,7 @@ static void run_test(void)
for(int i=0;i<NUM_DBS;i++) {
r = dbs[i]->close(dbs[i], 0); CKERR(r);
}
toku_mutex_destroy(&put_lock);
r = env->close(env, 0); CKERR(r);
}
......
......@@ -426,8 +426,7 @@ main(int argc, char * const argv[])
toku_os_initialize_settings(1);
r = test_main(argc, argv);
#if IS_TDB && TOKU_WINDOWS
int rdestroy = toku_ydb_destroy();
CKERR(rdestroy);
toku_ydb_destroy();
#endif
return r;
}
......
......@@ -193,22 +193,22 @@ int
toku_ydb_init(void) {
int r = 0;
//Lower level must be initialized first.
if (r==0)
r = toku_ft_layer_init(toku_ydb_lock, toku_ydb_unlock);
if (r==0)
if (r==0) {
r = toku_ft_layer_init();
}
if (r==0) {
toku_ydb_lock_init();
}
return r;
}
// Do not clean up resources if env is panicked, just exit ugly
int
void
toku_ydb_destroy(void) {
int r = 0;
if (env_is_panicked == 0) {
toku_ydb_lock_destroy();
r = toku_ft_layer_destroy();
toku_ft_layer_destroy();
}
return r;
}
static int
......
......@@ -14,7 +14,7 @@ extern "C" {
int toku_ydb_init(void);
// Called when the ydb library is unloaded.
int toku_ydb_destroy(void);
void toku_ydb_destroy(void);
// db_env_create for the trace library
int db_env_create_toku10(DB_ENV **, u_int32_t) __attribute__((__visibility__("default")));
......
......@@ -21,8 +21,7 @@ static void __attribute__((constructor)) libtokudb_init(void) {
static void __attribute__((destructor)) libtokudb_destroy(void) {
// printf("%s:%s:%d\n", __FILE__, __FUNCTION__, __LINE__);
int r = toku_ydb_destroy();
assert(r==0);
toku_ydb_destroy();
}
#endif
......@@ -40,7 +39,7 @@ BOOL WINAPI DllMain(HINSTANCE h, DWORD reason, LPVOID reserved) {
r = toku_ydb_init();
break;
case DLL_PROCESS_DETACH:
r = toku_ydb_destroy();
toku_ydb_destroy();
break;
case DLL_THREAD_ATTACH:
//TODO: Any new thread code if necessary, i.e. allocate per-thread
......
......@@ -374,6 +374,39 @@ do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT keys[], DB *s
return r;
}
//
// if a hot index is in progress, gets the indexer
// also verifies that there is at most one hot index
// in progress. If it finds more than one, then returns EINVAL
//
static int
get_indexer_if_exists(
uint32_t num_dbs,
DB **db_array,
DB_INDEXER** indexerp
)
{
int r = 0;
DB_INDEXER* first_indexer = NULL;
for (uint32_t i = 0; i < num_dbs; i++) {
DB_INDEXER* indexer = toku_db_get_indexer(db_array[i]);
if (indexer) {
if (!first_indexer) {
first_indexer = indexer;
}
else {
if (first_indexer != indexer) {
r = EINVAL;
}
}
}
}
if (r == 0) {
*indexerp = first_indexer;
}
return r;
}
int
env_del_multiple(
DB_ENV *env,
......@@ -388,6 +421,7 @@ env_del_multiple(
{
int r;
DBT del_keys[num_dbs];
DB_INDEXER* indexer = NULL;
HANDLE_PANICKED_ENV(env);
......@@ -401,6 +435,10 @@ env_del_multiple(
}
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
r = get_indexer_if_exists(num_dbs, db_array, &indexer);
if (r) {
goto cleanup;
}
{
uint32_t lock_flags[num_dbs];
......@@ -442,7 +480,10 @@ env_del_multiple(
brts[which_db] = db->i->ft_handle;
}
toku_ydb_lock();
if (indexer) {
toku_indexer_lock(indexer);
}
toku_multi_operation_client_lock();
if (num_dbs == 1) {
r = log_del_single(txn, brts[0], &del_keys[0]);
}
......@@ -452,7 +493,10 @@ env_del_multiple(
if (r == 0)
r = do_del_multiple(txn, num_dbs, db_array, del_keys, src_db, src_key);
}
toku_ydb_unlock();
toku_multi_operation_client_unlock();
if (indexer) {
toku_indexer_unlock(indexer);
}
cleanup:
if (r == 0)
......@@ -527,6 +571,7 @@ env_put_multiple_internal(
int r;
DBT put_keys[num_dbs];
DBT put_vals[num_dbs];
DB_INDEXER* indexer = NULL;
HANDLE_PANICKED_ENV(env);
......@@ -544,6 +589,10 @@ env_put_multiple_internal(
}
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
r = get_indexer_if_exists(num_dbs, db_array, &indexer);
if (r) {
goto cleanup;
}
for (uint32_t which_db = 0; which_db < num_dbs; which_db++) {
DB *db = db_array[which_db];
......@@ -587,7 +636,10 @@ env_put_multiple_internal(
brts[which_db] = db->i->ft_handle;
}
toku_ydb_lock();
if (indexer) {
toku_indexer_lock(indexer);
}
toku_multi_operation_client_lock();
if (num_dbs == 1) {
r = log_put_single(txn, brts[0], &put_keys[0], &put_vals[0]);
}
......@@ -597,7 +649,10 @@ env_put_multiple_internal(
if (r == 0) {
r = do_put_multiple(txn, num_dbs, db_array, put_keys, put_vals, src_db, src_key);
}
toku_ydb_unlock();
toku_multi_operation_client_unlock();
if (indexer) {
toku_indexer_unlock(indexer);
}
cleanup:
if (r == 0)
......@@ -617,6 +672,7 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
int r = 0;
HANDLE_PANICKED_ENV(env);
DB_INDEXER* indexer = NULL;
if (!txn) {
r = EINVAL;
......@@ -628,6 +684,10 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
}
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
r = get_indexer_if_exists(num_dbs, db_array, &indexer);
if (r) {
goto cleanup;
}
{
uint32_t n_del_dbs = 0;
......@@ -731,9 +791,10 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
n_put_dbs++;
}
}
// grab the ydb lock for the actual work that
// depends on it
toku_ydb_lock();
if (indexer) {
toku_indexer_lock(indexer);
}
toku_multi_operation_client_lock();
if (r == 0 && n_del_dbs > 0) {
if (n_del_dbs == 1)
r = log_del_single(txn, del_fts[0], &del_keys[0]);
......@@ -751,7 +812,10 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
if (r == 0)
r = do_put_multiple(txn, n_put_dbs, put_dbs, put_keys, put_vals, src_db, new_src_key);
}
toku_ydb_unlock();
toku_multi_operation_client_unlock();
if (indexer) {
toku_indexer_unlock(indexer);
}
}
cleanup:
......
......@@ -206,7 +206,7 @@ int toku_set_func_fclose(int (*)(FILE*));
int toku_set_func_read(ssize_t (*)(int, void *, size_t));
int toku_set_func_pread (ssize_t (*)(int, void *, size_t, off_t));
int toku_portability_init (void);
int toku_portability_destroy (void);
void toku_portability_destroy (void);
#if defined(__cplusplus) || defined(__cilkplusplus)
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment