Commit 5729e870 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4877], [t:4966], [t:4952], [t:4881], [t:4918], merge to main

git-svn-id: file:///svn/toku/tokudb@44130 c7de825b-a66e-492c-adef-691d508d4ae1
parent 31d817a3
default:\
:skip=/windows/,/dbg/,/opt/,/build/,/gccdbg/,/gccopt/,/iccdbg/,/iccopt/,/clangdbg/,/NightlyRelease/,/NightlyCoverage/,/NightlyDebug/,/Debug/,/Release/,/Coverage/,/cov/:
......@@ -8,6 +8,8 @@ file(GLOB_RECURSE all_srcs
src/*.c
utils/*.c
db-benchmark-test/*.c
)
list(APPEND all_srcs
${CMAKE_CURRENT_BINARY_DIR}/ft/log_code.c
${CMAKE_CURRENT_BINARY_DIR}/ft/log_print.c
)
......@@ -20,6 +22,8 @@ file(GLOB_RECURSE all_hdrs
src/*.h
utils/*.h
db-benchmark-test/*.h
)
list(APPEND all_hdrs
${CMAKE_CURRENT_BINARY_DIR}/toku_include/config.h
${CMAKE_CURRENT_BINARY_DIR}/buildheader/db.h
${CMAKE_CURRENT_BINARY_DIR}/ft/log_header.h
......@@ -76,14 +80,21 @@ endif ()
option(USE_GTAGS "Build the gtags database." ON)
if (USE_GTAGS)
find_program(GTAGS "gtags")
find_program(MKID "mkid")
if (NOT GTAGS MATCHES NOTFOUND)
## todo: use global -u instead of gtags each time
file(WRITE "${CMAKE_CURRENT_BINARY_DIR}/gtags.files" "")
foreach(file ${all_srcs} ${all_hdrs})
file(APPEND "${CMAKE_CURRENT_BINARY_DIR}/gtags.files" "${file}\n")
endforeach(file)
if (NOT MKID MATCHES NOTFOUND)
set(idutils_option "-I")
endif ()
add_custom_command(
OUTPUT "${CMAKE_CURRENT_SOURCE_DIR}/GTAGS"
OUTPUT "${CMAKE_CURRENT_SOURCE_DIR}/GRTAGS"
OUTPUT "${CMAKE_CURRENT_SOURCE_DIR}/GPATH"
OUTPUT "${CMAKE_CURRENT_SOURCE_DIR}/GSYMS"
COMMAND ${GTAGS} --gtagsconf "${CMAKE_CURRENT_SOURCE_DIR}/.globalrc"
COMMAND ${GTAGS} -i ${idutils_option} -f "${CMAKE_CURRENT_BINARY_DIR}/gtags.files"
DEPENDS ${all_srcs} ${all_hdrs} install_tdb_h generate_config_h generate_log_code
WORKING_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}")
add_custom_target(build_GTAGS ALL DEPENDS
......
......@@ -59,6 +59,8 @@ set(FT_SOURCES
quicklz.c
recover.c
rollback.c
rollback-apply.c
rollback-ct-callbacks.c
roll.c
sort.c
sub_block.c
......
......@@ -3246,14 +3246,14 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
int r = toku_log_xstillopen(logger, NULL, 0,
toku_txn_get_txnid(txn),
toku_txn_get_txnid(toku_logger_txn_parent(txn)),
txn->rollentry_raw_count,
txn->roll_info.rollentry_raw_count,
open_filenums,
txn->force_fsync_on_commit,
txn->num_rollback_nodes,
txn->num_rollentries,
txn->spilled_rollback_head,
txn->spilled_rollback_tail,
txn->current_rollback);
txn->roll_info.num_rollback_nodes,
txn->roll_info.num_rollentries,
txn->roll_info.spilled_rollback_head,
txn->roll_info.spilled_rollback_tail,
txn->roll_info.current_rollback);
assert(r==0);
return 0;
}
......@@ -3263,14 +3263,14 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
int r = toku_log_xstillopenprepared(logger, NULL, 0,
toku_txn_get_txnid(txn),
&xa_xid,
txn->rollentry_raw_count,
txn->roll_info.rollentry_raw_count,
open_filenums,
txn->force_fsync_on_commit,
txn->num_rollback_nodes,
txn->num_rollentries,
txn->spilled_rollback_head,
txn->spilled_rollback_tail,
txn->current_rollback);
txn->roll_info.num_rollback_nodes,
txn->roll_info.num_rollentries,
txn->roll_info.spilled_rollback_head,
txn->roll_info.spilled_rollback_tail,
txn->roll_info.current_rollback);
assert(r==0);
return 0;
}
......
......@@ -2687,45 +2687,44 @@ toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int nu
}
int
toku_ft_maybe_insert (FT_HANDLE brt, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, BOOL do_logging, enum ft_msg_type type) {
toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, BOOL do_logging, enum ft_msg_type type) {
assert(type==FT_INSERT || type==FT_INSERT_NO_OVERWRITE);
int r = 0;
XIDS message_xids = xids_get_root_xids(); //By default use committed messages
TXNID xid = toku_txn_get_txnid(txn);
if (txn) {
if (brt->ft->txnid_that_created_or_locked_when_empty != xid) {
if (ft_h->ft->txnid_that_created_or_locked_when_empty != xid) {
BYTESTRING keybs = {key->size, key->data};
r = toku_logger_save_rollback_cmdinsert(txn, toku_cachefile_filenum(brt->ft->cf), &keybs);
if (r!=0) return r;
r = toku_txn_note_ft(txn, brt->ft);
r = toku_logger_save_rollback_cmdinsert(txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs);
if (r!=0) return r;
toku_txn_maybe_note_ft(txn, ft_h->ft);
//We have transactions, and this is not 2440. We must send the full root-to-leaf-path
message_xids = toku_txn_get_xids(txn);
}
else if (txn->ancestor_txnid64 != brt->ft->h->root_xid_that_created) {
else if (txn->ancestor_txnid64 != ft_h->ft->h->root_xid_that_created) {
//We have transactions, and this is 2440, however the txn doing 2440 did not create the dictionary. We must send the full root-to-leaf-path
message_xids = toku_txn_get_xids(txn);
}
}
TOKULOGGER logger = toku_txn_logger(txn);
if (do_logging && logger &&
brt->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=val->size, .data=val->data};
if (type == FT_INSERT) {
r = toku_log_enq_insert(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->ft->cf), xid, keybs, valbs);
r = toku_log_enq_insert(logger, (LSN*)0, 0, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs, valbs);
}
else {
r = toku_log_enq_insert_no_overwrite(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->ft->cf), xid, keybs, valbs);
r = toku_log_enq_insert_no_overwrite(logger, (LSN*)0, 0, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs, valbs);
}
if (r!=0) return r;
}
LSN treelsn;
if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(brt->ft)).lsn) {
if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
r = 0;
} else {
r = toku_ft_send_insert(brt, key, val, message_xids, type);
r = toku_ft_send_insert(ft_h, key, val, message_xids, type);
}
return r;
}
......@@ -2740,7 +2739,7 @@ ft_send_update_msg(FT_HANDLE brt, FT_MSG_S *msg, TOKUTXN txn) {
}
int
toku_ft_maybe_update(FT_HANDLE brt, const DBT *key, const DBT *update_function_extra,
toku_ft_maybe_update(FT_HANDLE ft_h, const DBT *key, const DBT *update_function_extra,
TOKUTXN txn, BOOL oplsn_valid, LSN oplsn,
BOOL do_logging) {
int r = 0;
......@@ -2749,32 +2748,31 @@ toku_ft_maybe_update(FT_HANDLE brt, const DBT *key, const DBT *update_function_e
if (txn) {
BYTESTRING keybs = { key->size, key->data };
r = toku_logger_save_rollback_cmdupdate(
txn, toku_cachefile_filenum(brt->ft->cf), &keybs);
if (r != 0) { goto cleanup; }
r = toku_txn_note_ft(txn, brt->ft);
txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs);
if (r != 0) { goto cleanup; }
toku_txn_maybe_note_ft(txn, ft_h->ft);
}
TOKULOGGER logger = toku_txn_logger(txn);
if (do_logging && logger &&
brt->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING extrabs = {.len=update_function_extra->size,
.data=update_function_extra->data};
r = toku_log_enq_update(logger, NULL, 0,
toku_cachefile_filenum(brt->ft->cf),
toku_cachefile_filenum(ft_h->ft->cf),
xid, keybs, extrabs);
if (r != 0) { goto cleanup; }
}
LSN treelsn;
if (oplsn_valid &&
oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(brt->ft)).lsn) {
oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
r = 0;
} else {
FT_MSG_S msg = { FT_UPDATE, ZERO_MSN, NULL,
.u.id = { key, update_function_extra }};
r = ft_send_update_msg(brt, &msg, txn);
r = ft_send_update_msg(ft_h, &msg, txn);
}
cleanup:
......@@ -2782,7 +2780,7 @@ cleanup:
}
int
toku_ft_maybe_update_broadcast(FT_HANDLE brt, const DBT *update_function_extra,
toku_ft_maybe_update_broadcast(FT_HANDLE ft_h, const DBT *update_function_extra,
TOKUTXN txn, BOOL oplsn_valid, LSN oplsn,
BOOL do_logging, BOOL is_resetting_op) {
int r = 0;
......@@ -2790,19 +2788,18 @@ toku_ft_maybe_update_broadcast(FT_HANDLE brt, const DBT *update_function_extra,
TXNID xid = toku_txn_get_txnid(txn);
u_int8_t resetting = is_resetting_op ? 1 : 0;
if (txn) {
r = toku_logger_save_rollback_cmdupdatebroadcast(txn, toku_cachefile_filenum(brt->ft->cf), resetting);
if (r != 0) { goto cleanup; }
r = toku_txn_note_ft(txn, brt->ft);
r = toku_logger_save_rollback_cmdupdatebroadcast(txn, toku_cachefile_filenum(ft_h->ft->cf), resetting);
if (r != 0) { goto cleanup; }
toku_txn_maybe_note_ft(txn, ft_h->ft);
}
TOKULOGGER logger = toku_txn_logger(txn);
if (do_logging && logger &&
brt->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
BYTESTRING extrabs = {.len=update_function_extra->size,
.data=update_function_extra->data};
r = toku_log_enq_updatebroadcast(logger, NULL, 0,
toku_cachefile_filenum(brt->ft->cf),
toku_cachefile_filenum(ft_h->ft->cf),
xid, extrabs, resetting);
if (r != 0) { goto cleanup; }
}
......@@ -2810,14 +2807,14 @@ toku_ft_maybe_update_broadcast(FT_HANDLE brt, const DBT *update_function_extra,
//TODO(yoni): remove treelsn here and similar calls (no longer being used)
LSN treelsn;
if (oplsn_valid &&
oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(brt->ft)).lsn) {
oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
r = 0;
} else {
DBT nullkey;
const DBT *nullkeyp = toku_init_dbt(&nullkey);
FT_MSG_S msg = { FT_UPDATE_BROADCAST_ALL, ZERO_MSN, NULL,
.u.id = { nullkeyp, update_function_extra }};
r = ft_send_update_msg(brt, &msg, txn);
r = ft_send_update_msg(ft_h, &msg, txn);
}
cleanup:
......@@ -2884,38 +2881,37 @@ toku_ft_log_del_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int nu
}
int
toku_ft_maybe_delete(FT_HANDLE brt, DBT *key, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, BOOL do_logging) {
toku_ft_maybe_delete(FT_HANDLE ft_h, DBT *key, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, BOOL do_logging) {
int r;
XIDS message_xids = xids_get_root_xids(); //By default use committed messages
TXNID xid = toku_txn_get_txnid(txn);
if (txn) {
if (brt->ft->txnid_that_created_or_locked_when_empty != xid) {
if (ft_h->ft->txnid_that_created_or_locked_when_empty != xid) {
BYTESTRING keybs = {key->size, key->data};
r = toku_logger_save_rollback_cmddelete(txn, toku_cachefile_filenum(brt->ft->cf), &keybs);
if (r!=0) return r;
r = toku_txn_note_ft(txn, brt->ft);
r = toku_logger_save_rollback_cmddelete(txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs);
if (r!=0) return r;
toku_txn_maybe_note_ft(txn, ft_h->ft);
//We have transactions, and this is not 2440. We must send the full root-to-leaf-path
message_xids = toku_txn_get_xids(txn);
}
else if (txn->ancestor_txnid64 != brt->ft->h->root_xid_that_created) {
else if (txn->ancestor_txnid64 != ft_h->ft->h->root_xid_that_created) {
//We have transactions, and this is 2440, however the txn doing 2440 did not create the dictionary. We must send the full root-to-leaf-path
message_xids = toku_txn_get_xids(txn);
}
}
TOKULOGGER logger = toku_txn_logger(txn);
if (do_logging && logger &&
brt->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
BYTESTRING keybs = {.len=key->size, .data=key->data};
r = toku_log_enq_delete_any(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->ft->cf), xid, keybs);
r = toku_log_enq_delete_any(logger, (LSN*)0, 0, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs);
if (r!=0) return r;
}
LSN treelsn;
if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(brt->ft)).lsn) {
if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
r = 0;
} else {
r = toku_ft_send_delete(brt, key, message_xids);
r = toku_ft_send_delete(ft_h, key, message_xids);
}
return r;
}
......@@ -3072,7 +3068,7 @@ verify_builtin_comparisons_consistent(FT_HANDLE t, u_int32_t flags) {
int
toku_ft_change_descriptor(
FT_HANDLE t,
FT_HANDLE ft_h,
const DBT* old_descriptor,
const DBT* new_descriptor,
BOOL do_log,
......@@ -3092,19 +3088,18 @@ toku_ft_change_descriptor(
// put information into rollback file
r = toku_logger_save_rollback_change_fdescriptor(
txn,
toku_cachefile_filenum(t->ft->cf),
toku_cachefile_filenum(ft_h->ft->cf),
&old_desc_bs
);
if (r != 0) { goto cleanup; }
r = toku_txn_note_ft(txn, t->ft);
if (r != 0) { goto cleanup; }
toku_txn_maybe_note_ft(txn, ft_h->ft);
if (do_log) {
TOKULOGGER logger = toku_txn_logger(txn);
TXNID xid = toku_txn_get_txnid(txn);
r = toku_log_change_fdescriptor(
logger, NULL, 0,
toku_cachefile_filenum(t->ft->cf),
toku_cachefile_filenum(ft_h->ft->cf),
xid,
old_desc_bs,
new_desc_bs,
......@@ -3115,8 +3110,8 @@ toku_ft_change_descriptor(
// write new_descriptor to header
new_d.dbt = *new_descriptor;
fd = toku_cachefile_get_fd (t->ft->cf);
r = toku_update_descriptor(t->ft, &new_d, fd);
fd = toku_cachefile_get_fd (ft_h->ft->cf);
r = toku_update_descriptor(ft_h->ft, &new_d, fd);
// very infrequent operation, worth precise threadsafe count
if (r == 0) {
STATUS_VALUE(FT_DESCRIPTOR_SET)++;
......@@ -3124,7 +3119,7 @@ toku_ft_change_descriptor(
if (r!=0) goto cleanup;
if (update_cmp_descriptor) {
toku_ft_update_cmp_descriptor(t->ft);
toku_ft_update_cmp_descriptor(ft_h->ft);
}
cleanup:
return r;
......@@ -3148,7 +3143,7 @@ toku_ft_handle_inherit_options(FT_HANDLE t, FT ft) {
// fname_in_env is the iname, relative to the env_dir (data_dir is already in iname as prefix).
// The checkpointed version (checkpoint_lsn) of the dictionary must be no later than max_acceptable_lsn .
static int
ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, FILENUM use_filenum, DICTIONARY_ID use_dictionary_id, LSN max_acceptable_lsn) {
ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, FILENUM use_filenum, DICTIONARY_ID use_dictionary_id, LSN max_acceptable_lsn) {
int r;
BOOL txn_created = FALSE;
char *fname_in_cwd = NULL;
......@@ -3157,8 +3152,8 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
BOOL did_create = FALSE;
toku_ft_open_close_lock();
if (t->did_set_flags) {
r = verify_builtin_comparisons_consistent(t, t->options.flags);
if (ft_h->did_set_flags) {
r = verify_builtin_comparisons_consistent(ft_h, ft_h->options.flags);
if (r!=0) { goto exit; }
}
if (txn && txn->logger->is_panicked) {
......@@ -3184,21 +3179,21 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
assert_zero(r);
}
txn_created = (BOOL)(txn!=NULL);
r = toku_logger_log_fcreate(txn, fname_in_env, reserved_filenum, mode, t->options.flags, t->options.nodesize, t->options.basementnodesize, t->options.compression_method);
r = toku_logger_log_fcreate(txn, fname_in_env, reserved_filenum, mode, ft_h->options.flags, ft_h->options.nodesize, ft_h->options.basementnodesize, ft_h->options.compression_method);
assert_zero(r); // only possible failure is panic, which we check above
r = ft_create_file(t, fname_in_cwd, &fd);
r = ft_create_file(ft_h, fname_in_cwd, &fd);
assert_zero(r);
}
if (r) { goto exit; }
r=toku_cachetable_openfd_with_filenum(&cf, cachetable, fd, fname_in_env, reserved_filenum);
if (r) { goto exit; }
}
assert(t->options.nodesize>0);
assert(ft_h->options.nodesize>0);
BOOL was_already_open;
if (is_create) {
r = toku_read_ft_and_store_in_cachefile(t, cf, max_acceptable_lsn, &ft, &was_already_open);
r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft, &was_already_open);
if (r==TOKUDB_DICTIONARY_NO_HEADER) {
r = toku_create_new_ft(&ft, &t->options, cf, txn);
r = toku_create_new_ft(&ft, &ft_h->options, cf, txn);
if (r) { goto exit; }
}
else if (r!=0) {
......@@ -3213,21 +3208,21 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
// so it is ok for toku_read_ft_and_store_in_cachefile to have read
// the header via toku_read_ft_and_store_in_cachefile
} else {
r = toku_read_ft_and_store_in_cachefile(t, cf, max_acceptable_lsn, &ft, &was_already_open);
r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft, &was_already_open);
if (r) { goto exit; }
}
if (!t->did_set_flags) {
r = verify_builtin_comparisons_consistent(t, t->options.flags);
if (!ft_h->did_set_flags) {
r = verify_builtin_comparisons_consistent(ft_h, ft_h->options.flags);
if (r) { goto exit; }
} else if (t->options.flags != ft->h->flags) { /* if flags have been set then flags must match */
} else if (ft_h->options.flags != ft->h->flags) { /* if flags have been set then flags must match */
r = EINVAL;
goto exit;
}
toku_ft_handle_inherit_options(t, ft);
toku_ft_handle_inherit_options(ft_h, ft);
if (!was_already_open) {
if (!did_create) { //Only log the fopen that OPENs the file. If it was already open, don't log.
r = toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(cf), t->options.flags);
r = toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(cf), ft_h->options.flags);
assert_zero(r);
}
}
......@@ -3257,12 +3252,11 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
// with the brt, the function is not allowed to fail
// Code that handles failure (located below "exit"),
// depends on this
toku_ft_note_ft_handle_open(ft, t);
toku_ft_note_ft_handle_open(ft, ft_h);
if (txn_created) {
assert(txn);
toku_ft_suppress_rollbacks(ft, txn);
r = toku_txn_note_ft(txn, ft);
assert_zero(r);
toku_txn_maybe_note_ft(txn, ft);
}
//Opening a brt may restore to previous checkpoint. Truncate if necessary.
......@@ -5506,11 +5500,8 @@ toku_ft_remove_on_commit(FT_HANDLE handle, TOKUTXN txn) {
cf = handle->ft->cf;
FT ft = toku_cachefile_get_userdata(cf);
// TODO: toku_txn_note_ft should return void
// Assert success here because note_ft also asserts success internally.
r = toku_txn_note_ft(txn, ft);
assert(r == 0);
toku_txn_maybe_note_ft(txn, ft);
// If the txn commits, the commit MUST be in the log before the file is actually unlinked
toku_txn_force_fsync_on_commit(txn);
// make entry in rollback log
......
......@@ -776,11 +776,11 @@ toku_dictionary_redirect_abort(FT old_h, FT new_h, TOKUTXN txn) {
*****/
int
toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft, TOKUTXN txn) {
toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft_h, TOKUTXN txn) {
// Input args:
// new file name for dictionary (relative to env)
// old_ft is a live brt of open handle ({DB, BRT} pair) that currently refers to old dictionary file.
// (old_ft may be one of many handles to the dictionary.)
// old_ft_h is a live brt of open handle ({DB, BRT} pair) that currently refers to old dictionary file.
// (old_ft_h may be one of many handles to the dictionary.)
// txn that created the loader
// Requires:
// ydb_lock is held.
......@@ -797,11 +797,11 @@ toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft, TOKUTX
// If the txn aborts, then this operation will be undone
int r;
FT old_h = old_ft->ft;
FT old_ft = old_ft_h->ft;
// dst file should not be open. (implies that dst and src are different because src must be open.)
{
CACHETABLE ct = toku_cachefile_get_cachetable(old_h->cf);
CACHETABLE ct = toku_cachefile_get_cachetable(old_ft->cf);
CACHEFILE cf;
r = toku_cachefile_of_iname_in_env(ct, dst_fname_in_env, &cf);
if (r==0) {
......@@ -813,25 +813,24 @@ toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft, TOKUTX
}
if (txn) {
r = toku_txn_note_ft(txn, old_h); // mark old brt as touched by this txn
assert_zero(r);
toku_txn_maybe_note_ft(txn, old_ft); // mark old ft as touched by this txn
}
FT new_h;
r = dictionary_redirect_internal(dst_fname_in_env, old_h, txn, &new_h);
FT new_ft;
r = dictionary_redirect_internal(dst_fname_in_env, old_ft, txn, &new_ft);
assert_zero(r);
// make rollback log entry
if (txn) {
r = toku_txn_note_ft(txn, new_h); // mark new brt as touched by this txn
toku_txn_maybe_note_ft(txn, new_ft); // mark new ft as touched by this txn
FILENUM old_filenum = toku_cachefile_filenum(old_h->cf);
FILENUM new_filenum = toku_cachefile_filenum(new_h->cf);
FILENUM old_filenum = toku_cachefile_filenum(old_ft->cf);
FILENUM new_filenum = toku_cachefile_filenum(new_ft->cf);
r = toku_logger_save_rollback_dictionary_redirect(txn, old_filenum, new_filenum);
assert_zero(r);
TXNID xid = toku_txn_get_txnid(txn);
toku_ft_suppress_rollbacks(new_h, txn);
toku_ft_suppress_rollbacks(new_ft, txn);
r = toku_log_suppress_rollback(txn->logger, NULL, 0, new_filenum, xid);
assert_zero(r);
}
......@@ -849,29 +848,14 @@ static int find_xid (OMTVALUE v, void *txnv) {
return 0;
}
// returns if ref was added
BOOL
toku_ft_maybe_add_txn_ref(FT h, TOKUTXN txn) {
BOOL ref_added = FALSE;
OMTVALUE txnv;
u_int32_t index;
toku_ft_grab_reflock(h);
// Does brt already know about transaction txn?
int r = toku_omt_find_zero(h->txns, find_xid, txn, &txnv, &index);
if (r==0) {
// It's already there.
assert((TOKUTXN)txnv==txn);
ref_added = FALSE;
goto exit;
}
// Otherwise it's not there.
// Insert reference to transaction into brt
r = toku_omt_insert_at(h->txns, txn, index);
// Insert reference to transaction into ft
void
toku_ft_add_txn_ref(FT ft, TOKUTXN txn) {
toku_ft_grab_reflock(ft);
uint32_t idx;
int r = toku_omt_insert(ft->txns, txn, find_xid, txn, &idx);
assert(r==0);
ref_added = TRUE;
exit:
toku_ft_release_reflock(h);
return ref_added;
toku_ft_release_reflock(ft);
}
static void
......
......@@ -12,6 +12,7 @@
#include "cachetable.h"
#include "log.h"
#include "ft-search.h"
#include "ft-ops.h"
#include "compress.h"
// remove a ft, transactionless.
......@@ -68,8 +69,8 @@ void toku_reset_root_xid_that_created(FT h, TXNID new_root_xid_that_created);
// This redefines which xid created the dictionary.
BOOL
toku_ft_maybe_add_txn_ref(FT h, TOKUTXN txn);
void
toku_ft_add_txn_ref(FT h, TOKUTXN txn);
void
toku_ft_remove_txn_ref(FT h, TOKUTXN txn);
......
......@@ -104,24 +104,7 @@ struct tokulogger {
int toku_logger_find_next_unused_log_file(const char *directory, long long *result);
int toku_logger_find_logfiles (const char *directory, char ***resultp, int *n_logfiles);
struct tokutxn {
u_int64_t txnid64; /* this happens to be the first lsn */
u_int64_t ancestor_txnid64; /* this is the lsn of root transaction */
u_int64_t snapshot_txnid64; /* this is the lsn of the snapshot */
TOKULOGGER logger;
TOKUTXN parent;
DB_TXN* container_db_txn; // reference to DB_TXN that contains this tokutxn
time_t starttime; // timestamp in seconds of transaction start
u_int64_t rollentry_raw_count; // the total count of every byte in the transaction and all its children.
OMT open_fts; // a collection of the brts that we touched. Indexed by filenum.
TXN_SNAPSHOT_TYPE snapshot_type;
OMT live_root_txn_list; // the root txns live when the root ancestor (self if a root) started
XIDS xids; //Represents the xid list
BOOL force_fsync_on_commit; //This transaction NEEDS an fsync once (if) it commits. (commit means root txn)
TXN_PROGRESS_POLL_FUNCTION progress_poll_fun;
void * progress_poll_fun_extra;
struct txn_roll_info {
// these are number of rollback nodes and rollback entries for this txn.
//
// the current rollback node below has sequence number num_rollback_nodes - 1
......@@ -131,35 +114,76 @@ struct tokutxn {
// rollback nodes for this transaction is non-zero, then we will use
// the number of rollback nodes to know which sequence number to assign
// to a new one we create
uint64_t num_rollback_nodes;
uint64_t num_rollentries;
uint64_t num_rollentries_processed;
uint64_t num_rollback_nodes;
uint64_t num_rollentries;
uint64_t num_rollentries_processed;
uint64_t rollentry_raw_count; // the total count of every byte in the transaction and all its children.
// spilled rollback nodes are rollback nodes that were gorged by this
// transaction, retired, and saved in a list.
// the spilled rollback head is the block number of the first rollback node
// that makes up the rollback log chain
BLOCKNUM spilled_rollback_head;
uint32_t spilled_rollback_head_hash;
BLOCKNUM spilled_rollback_head;
uint32_t spilled_rollback_head_hash;
// the spilled rollback is the block number of the last rollback node that
// makes up the rollback log chain.
BLOCKNUM spilled_rollback_tail;
uint32_t spilled_rollback_tail_hash;
BLOCKNUM spilled_rollback_tail;
uint32_t spilled_rollback_tail_hash;
// the current rollback node block number we may use. if this is ROLLBACK_NONE,
// then we need to create one and set it here before using it.
BLOCKNUM current_rollback;
uint32_t current_rollback_hash;
BLOCKNUM current_rollback;
uint32_t current_rollback_hash;
};
BOOL recovered_from_checkpoint;
struct tokutxn {
// These don't change after create:
const time_t starttime; // timestamp in seconds of transaction start
const u_int64_t txnid64; // this happens to be the first lsn
const u_int64_t ancestor_txnid64; // this is the lsn of root transaction
const u_int64_t snapshot_txnid64; // this is the lsn of the snapshot
const TXN_SNAPSHOT_TYPE snapshot_type;
const BOOL recovered_from_checkpoint;
const TOKULOGGER logger;
const TOKUTXN parent;
// These don't either but they're created in a way that's hard to make
// strictly const.
DB_TXN *container_db_txn; // reference to DB_TXN that contains this tokutxn
OMT live_root_txn_list; // the root txns live when the root ancestor (self if a root) started.
XIDS xids; // Represents the xid list
// These are not read until a commit, prepare, or abort starts, and
// they're "monotonic" (only go false->true) during operation:
BOOL checkpoint_needed_before_commit;
BOOL do_fsync;
BOOL force_fsync_on_commit; //This transaction NEEDS an fsync once (if) it commits. (commit means root txn)
// Not used until commit, prepare, or abort starts:
LSN do_fsync_lsn;
TOKU_XA_XID xa_xid; // for prepared transactions
TXN_PROGRESS_POLL_FUNCTION progress_poll_fun;
void *progress_poll_fun_extra;
toku_mutex_t txn_lock;
// Protected by the txn lock:
OMT open_fts; // a collection of the brts that we touched. Indexed by filenum.
struct txn_roll_info roll_info; // Info used to manage rollback entries
// Protected by the txn manager lock:
TOKUTXN_STATE state;
LSN do_fsync_lsn;
BOOL do_fsync;
TOKU_XA_XID xa_xid; // for prepared transactions
struct toku_list prepared_txns_link; // list of prepared transactions
};
static inline int
txn_has_current_rollback_log(TOKUTXN txn) {
return txn->roll_info.current_rollback.b != ROLLBACK_NONE.b;
}
static inline int
txn_has_spilled_rollback_logs(TOKUTXN txn) {
return txn->roll_info.spilled_rollback_tail.b != ROLLBACK_NONE.b;
}
struct txninfo {
uint64_t rollentry_raw_count; // the total count of every byte in the transaction and all its children.
uint32_t num_fts;
......
......@@ -573,6 +573,7 @@ generate_rollbacks (void) {
fprintf(hf, ");\n");
fprintf(cf, ") {\n");
fprintf(cf, " toku_txn_lock(txn);\n");
fprintf(cf, " ROLLBACK_LOG_NODE log;\n");
fprintf(cf, " toku_get_and_pin_rollback_log_for_new_entry(txn, &log);\n");
// 'memdup' all BYTESTRINGS here
......@@ -601,19 +602,20 @@ generate_rollbacks (void) {
fprintf(cf, " struct roll_entry *v;\n");
fprintf(cf, " size_t mem_needed = sizeof(v->u.%s) + __builtin_offsetof(struct roll_entry, u.%s);\n", lt->name, lt->name);
fprintf(cf, " v = toku_malloc_in_rollback(log, mem_needed);\n");
fprintf(cf, " if (v==0) return errno;\n");
fprintf(cf, " assert(v);\n");
fprintf(cf, " v->cmd = (enum rt_cmd)%u;\n", lt->command_and_flags&0xff);
DO_FIELDS(field_type, lt, fprintf(cf, " v->u.%s.%s = %s;\n", lt->name, field_type->name, field_type->name));
fprintf(cf, " v->prev = log->newest_logentry;\n");
fprintf(cf, " if (log->oldest_logentry==NULL) log->oldest_logentry=v;\n");
fprintf(cf, " log->newest_logentry = v;\n");
fprintf(cf, " log->rollentry_resident_bytecount += rollback_fsize;\n");
fprintf(cf, " txn->rollentry_raw_count += rollback_fsize;\n");
fprintf(cf, " txn->num_rollentries++;\n");
fprintf(cf, " txn->roll_info.rollentry_raw_count += rollback_fsize;\n");
fprintf(cf, " txn->roll_info.num_rollentries++;\n");
fprintf(cf, " log->dirty = TRUE;\n");
fprintf(cf, " // spill and unpin assert success internally\n");
fprintf(cf, " toku_maybe_spill_rollbacks(txn, log);\n");
fprintf(cf, " toku_rollback_log_unpin(txn, log);\n");
fprintf(cf, " toku_txn_unlock(txn);\n");
fprintf(cf, " return 0;\n}\n");
});
......
......@@ -480,7 +480,7 @@ recover_transaction(TOKUTXN *txnp, TXNID xid, TXNID parentxid, TOKULOGGER logger
assert(r == 0);
assert(txn==NULL);
}
r = toku_txn_begin_with_xid(parent, &txn, logger, xid, TXN_SNAPSHOT_NONE, NULL);
r = toku_txn_begin_with_xid(parent, &txn, logger, xid, TXN_SNAPSHOT_NONE, NULL, true);
assert(r == 0);
if (txnp) *txnp = txn;
return 0;
......@@ -623,10 +623,9 @@ static int toku_recover_suppress_rollback (struct logtype_suppress_rollback *UU(
r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0);
assert(txn!=NULL);
FT h = tuple->ft_handle->ft;
toku_ft_suppress_rollbacks(h, txn);
r = toku_txn_note_ft(txn, tuple->ft_handle->ft);
assert(r==0);
FT ft = tuple->ft_handle->ft;
toku_ft_suppress_rollbacks(ft, txn);
toku_txn_maybe_note_ft(txn, ft);
}
return 0;
}
......@@ -884,9 +883,8 @@ static int toku_recover_enq_insert (struct logtype_enq_insert *l, RECOVER_ENV re
toku_fill_dbt(&valdbt, l->value.data, l->value.len);
r = toku_ft_maybe_insert(tuple->ft_handle, &keydbt, &valdbt, txn, TRUE, l->lsn, FALSE, FT_INSERT);
assert(r == 0);
r = toku_txn_note_ft(txn, tuple->ft_handle->ft);
assert(r == 0);
}
toku_txn_maybe_note_ft(txn, tuple->ft_handle->ft);
}
return 0;
}
......
......@@ -7,6 +7,7 @@
/* rollback and rollforward routines. */
#include "includes.h"
#include "rollback-apply.h"
#include "xids.h"
// functionality provided by roll.c is exposed by an autogenerated
......
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "fttypes.h"
#include "log-internal.h"
#include "rollback-apply.h"
static void
poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint) {
if (txn->progress_poll_fun) {
TOKU_TXN_PROGRESS_S progress = {
.entries_total = txn->roll_info.num_rollentries,
.entries_processed = txn->roll_info.num_rollentries_processed,
.is_commit = is_commit,
.stalled_on_checkpoint = stall_for_checkpoint};
txn->progress_poll_fun(&progress, txn->progress_poll_fun_extra);
}
}
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
int r=0;
rolltype_dispatch_assign(item, toku_commit_, r, txn, lsn);
txn->roll_info.num_rollentries_processed++;
if (txn->roll_info.num_rollentries_processed % 1024 == 0) {
poll_txn_progress_function(txn, TRUE, FALSE);
}
return r;
}
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
int r=0;
rolltype_dispatch_assign(item, toku_rollback_, r, txn, lsn);
txn->roll_info.num_rollentries_processed++;
if (txn->roll_info.num_rollentries_processed % 1024 == 0) {
poll_txn_progress_function(txn, FALSE, FALSE);
}
return r;
}
static int
note_ft_used_in_txns_parent(OMTVALUE ftv, u_int32_t UU(index), void *txnv) {
TOKUTXN child = txnv;
TOKUTXN parent = child->parent;
FT ft = ftv;
toku_txn_maybe_note_ft(parent, ft);
if (ft->txnid_that_created_or_locked_when_empty == toku_txn_get_txnid(child)) {
//Pass magic "no rollback needed" flag to parent.
ft->txnid_that_created_or_locked_when_empty = toku_txn_get_txnid(parent);
}
if (ft->txnid_that_suppressed_recovery_logs == toku_txn_get_txnid(child)) {
//Pass magic "no recovery needed" flag to parent.
ft->txnid_that_suppressed_recovery_logs = toku_txn_get_txnid(parent);
}
return 0;
}
static int
apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) {
int r = 0;
// do the commit/abort calls and free everything
// we do the commit/abort calls in reverse order too.
struct roll_entry *item;
//printf("%s:%d abort\n", __FILE__, __LINE__);
BLOCKNUM next_log = ROLLBACK_NONE;
uint32_t next_log_hash = 0;
BOOL is_current = FALSE;
if (txn_has_current_rollback_log(txn)) {
next_log = txn->roll_info.current_rollback;
next_log_hash = txn->roll_info.current_rollback_hash;
is_current = TRUE;
}
else if (txn_has_spilled_rollback_logs(txn)) {
next_log = txn->roll_info.spilled_rollback_tail;
next_log_hash = txn->roll_info.spilled_rollback_tail_hash;
}
uint64_t last_sequence = txn->roll_info.num_rollback_nodes;
BOOL found_head = FALSE;
while (next_log.b != ROLLBACK_NONE.b) {
ROLLBACK_LOG_NODE log;
//pin log
toku_get_and_pin_rollback_log(txn, next_log, next_log_hash, &log);
toku_rollback_verify_contents(log, txn->txnid64, last_sequence - 1);
toku_maybe_prefetch_previous_rollback_log(txn, log);
last_sequence = log->sequence;
if (func) {
while ((item=log->newest_logentry)) {
log->newest_logentry = item->prev;
r = func(txn, item, lsn);
if (r!=0) return r;
}
}
if (next_log.b == txn->roll_info.spilled_rollback_head.b) {
assert(!found_head);
found_head = TRUE;
assert(log->sequence == 0);
}
next_log = log->previous;
next_log_hash = log->previous_hash;
{
//Clean up transaction structure to prevent
//toku_txn_close from double-freeing
if (is_current) {
txn->roll_info.current_rollback = ROLLBACK_NONE;
txn->roll_info.current_rollback_hash = 0;
is_current = FALSE;
}
else {
txn->roll_info.spilled_rollback_tail = next_log;
txn->roll_info.spilled_rollback_tail_hash = next_log_hash;
}
if (found_head) {
assert(next_log.b == ROLLBACK_NONE.b);
txn->roll_info.spilled_rollback_head = next_log;
txn->roll_info.spilled_rollback_head_hash = next_log_hash;
}
}
toku_rollback_log_unpin_and_remove(txn, log);
}
return r;
}
//Commit each entry in the rollback log.
//If the transaction has a parent, it just promotes its information to its parent.
int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
int r=0;
if (txn->parent!=0) {
// First we must put a rollinclude entry into the parent if we spilled
if (txn_has_spilled_rollback_logs(txn)) {
uint64_t num_nodes = txn->roll_info.num_rollback_nodes;
if (txn_has_current_rollback_log(txn)) {
num_nodes--; //Don't count the in-progress rollback log.
}
r = toku_logger_save_rollback_rollinclude(txn->parent, txn->txnid64, num_nodes,
txn->roll_info.spilled_rollback_head, txn->roll_info.spilled_rollback_head_hash,
txn->roll_info.spilled_rollback_tail, txn->roll_info.spilled_rollback_tail_hash);
if (r!=0) return r;
//Remove ownership from child.
txn->roll_info.spilled_rollback_head = ROLLBACK_NONE;
txn->roll_info.spilled_rollback_head_hash = 0;
txn->roll_info.spilled_rollback_tail = ROLLBACK_NONE;
txn->roll_info.spilled_rollback_tail_hash = 0;
}
// if we're commiting a child rollback, put its entries into the parent
// by pinning both child and parent and then linking the child log entry
// list to the end of the parent log entry list.
if (txn_has_current_rollback_log(txn)) {
//Pin parent log
ROLLBACK_LOG_NODE parent_log;
toku_get_and_pin_rollback_log_for_new_entry(txn->parent, &parent_log);
//Pin child log
ROLLBACK_LOG_NODE child_log;
toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback,
txn->roll_info.current_rollback_hash, &child_log);
toku_rollback_verify_contents(child_log, txn->txnid64, txn->roll_info.num_rollback_nodes - 1);
// Append the list to the front of the parent.
if (child_log->oldest_logentry) {
// There are some entries, so link them in.
child_log->oldest_logentry->prev = parent_log->newest_logentry;
if (!parent_log->oldest_logentry) {
parent_log->oldest_logentry = child_log->oldest_logentry;
}
parent_log->newest_logentry = child_log->newest_logentry;
parent_log->rollentry_resident_bytecount += child_log->rollentry_resident_bytecount;
txn->parent->roll_info.rollentry_raw_count += txn->roll_info.rollentry_raw_count;
child_log->rollentry_resident_bytecount = 0;
}
if (parent_log->oldest_logentry==NULL) {
parent_log->oldest_logentry = child_log->oldest_logentry;
}
child_log->newest_logentry = child_log->oldest_logentry = 0;
// Put all the memarena data into the parent.
if (memarena_total_size_in_use(child_log->rollentry_arena) > 0) {
// If there are no bytes to move, then just leave things alone, and let the memory be reclaimed on txn is closed.
memarena_move_buffers(parent_log->rollentry_arena, child_log->rollentry_arena);
}
toku_rollback_log_unpin_and_remove(txn, child_log);
txn->roll_info.current_rollback = ROLLBACK_NONE;
txn->roll_info.current_rollback_hash = 0;
toku_maybe_spill_rollbacks(txn->parent, parent_log);
toku_rollback_log_unpin(txn->parent, parent_log);
assert(r == 0);
}
// Note the open brts, the omts must be merged
r = toku_omt_iterate(txn->open_fts, note_ft_used_in_txns_parent, txn);
assert(r==0);
// Merge the list of headers that must be checkpointed before commit
if (txn->checkpoint_needed_before_commit) {
txn->parent->checkpoint_needed_before_commit = TRUE;
}
//If this transaction needs an fsync (if it commits)
//save that in the parent. Since the commit really happens in the root txn.
txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit;
txn->parent->roll_info.num_rollentries += txn->roll_info.num_rollentries;
} else {
r = apply_txn(txn, lsn, toku_commit_rollback_item);
assert(r==0);
}
return r;
}
int toku_rollback_abort(TOKUTXN txn, LSN lsn) {
int r;
r = apply_txn(txn, lsn, toku_abort_rollback_item);
assert(r==0);
return r;
}
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ifndef ROLLBACK_APPLY_H
#define ROLLBACK_APPLY_H
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
typedef int(*apply_rollback_item)(TOKUTXN txn, struct roll_entry *item, LSN lsn);
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn);
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn);
int toku_rollback_commit(TOKUTXN txn, LSN lsn);
int toku_rollback_abort(TOKUTXN txn, LSN lsn);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif // ROLLBACK_APPLY_H
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "rollback-ct-callbacks.h"
#include <toku_portability.h>
#include <memory.h>
#include "ft-internal.h"
#include "fttypes.h"
#include "memarena.h"
#include "rollback.h"
// Cleanup the rollback memory
static void
rollback_log_destroy(ROLLBACK_LOG_NODE log) {
memarena_close(&log->rollentry_arena);
toku_free(log);
}
// Write something out. Keep trying even if partial writes occur.
// On error: Return negative with errno set.
// On success return nbytes.
void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname,
void *rollback_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size,
BOOL write_me, BOOL keep_me, BOOL for_checkpoint, BOOL UU(is_clone)) {
int r;
ROLLBACK_LOG_NODE log = rollback_v;
FT h = extraargs;
assert(h->cf == cachefile);
assert(log->blocknum.b==logname.b);
if (write_me && !h->panic) {
int n_workitems, n_threads;
toku_cachefile_get_workqueue_load(cachefile, &n_workitems, &n_threads);
r = toku_serialize_rollback_log_to(fd, log->blocknum, log, h, n_workitems, n_threads, for_checkpoint);
if (r) {
if (h->panic==0) {
char *e = strerror(r);
int l = 200 + strlen(e);
char s[l];
h->panic=r;
snprintf(s, l-1, "While writing data to disk, error %d (%s)", r, e);
h->panic_string = toku_strdup(s);
}
}
}
*new_size = size;
if (!keep_me) {
rollback_log_destroy(log);
}
}
int toku_rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname, u_int32_t fullhash,
void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs) {
int r;
FT h = extraargs;
assert(h->cf == cachefile);
ROLLBACK_LOG_NODE *result = (ROLLBACK_LOG_NODE*)rollback_pv;
r = toku_deserialize_rollback_log_from(fd, logname, fullhash, result, h);
if (r==0) {
*sizep = rollback_memory_size(*result);
}
return r;
}
void toku_rollback_pe_est_callback(
void* rollback_v,
void* UU(disk_data),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
assert(rollback_v != NULL);
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
// callback for partially evicting a cachetable entry
int toku_rollback_pe_callback (
void *rollback_v,
PAIR_ATTR UU(old_attr),
PAIR_ATTR* new_attr,
void* UU(extraargs)
)
{
assert(rollback_v != NULL);
*new_attr = old_attr;
return 0;
}
// partial fetch is never required for a rollback log node
BOOL toku_rollback_pf_req_callback(void* UU(ftnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
// a rollback node should never be partial fetched,
// because we always say it is not required.
// (pf req callback always returns false)
int toku_rollback_pf_callback(void* UU(ftnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep)) {
assert(FALSE);
return 0;
}
// the cleaner thread should never choose a rollback node for cleaning
int toku_rollback_cleaner_callback (
void* UU(ftnode_pv),
BLOCKNUM UU(blocknum),
u_int32_t UU(fullhash),
void* UU(extraargs)
)
{
assert(FALSE);
return 0;
}
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ifndef ROLLBACK_CT_CALLBACKS_H
#define ROLLBACK_CT_CALLBACKS_H
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
#include "cachetable.h"
#include "fttypes.h"
void toku_rollback_flush_callback(CACHEFILE cachefile, int fd, BLOCKNUM logname, void *rollback_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size, BOOL write_me, BOOL keep_me, BOOL for_checkpoint, BOOL UU(is_clone));
int toku_rollback_fetch_callback(CACHEFILE cachefile, int fd, BLOCKNUM logname, u_int32_t fullhash, void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs);
void toku_rollback_pe_est_callback(
void* rollback_v,
void* UU(disk_data),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
);
int toku_rollback_pe_callback (
void *rollback_v,
PAIR_ATTR UU(old_attr),
PAIR_ATTR* new_attr,
void* UU(extraargs)
) ;
BOOL toku_rollback_pf_req_callback(void* UU(ftnode_pv), void* UU(read_extraargs)) ;
int toku_rollback_pf_callback(void* UU(ftnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep));
int toku_rollback_cleaner_callback (
void* UU(ftnode_pv),
BLOCKNUM UU(blocknum),
u_int32_t UU(fullhash),
void* UU(extraargs)
);
static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_rollback_log(FT h) {
CACHETABLE_WRITE_CALLBACK wc;
wc.flush_callback = toku_rollback_flush_callback;
wc.pe_est_callback = toku_rollback_pe_est_callback;
wc.pe_callback = toku_rollback_pe_callback;
wc.cleaner_callback = toku_rollback_cleaner_callback;
wc.clone_callback = NULL;
wc.write_extraargs = h;
return wc;
}
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif // ROLLBACK_CT_CALLBACKS_H
......@@ -5,48 +5,7 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
static void
poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint) {
if (txn->progress_poll_fun) {
TOKU_TXN_PROGRESS_S progress = {
.entries_total = txn->num_rollentries,
.entries_processed = txn->num_rollentries_processed,
.is_commit = is_commit,
.stalled_on_checkpoint = stall_for_checkpoint};
txn->progress_poll_fun(&progress, txn->progress_poll_fun_extra);
}
}
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
int r=0;
rolltype_dispatch_assign(item, toku_commit_, r, txn, lsn);
txn->num_rollentries_processed++;
if (txn->num_rollentries_processed % 1024 == 0) {
poll_txn_progress_function(txn, TRUE, FALSE);
}
return r;
}
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
int r=0;
rolltype_dispatch_assign(item, toku_rollback_, r, txn, lsn);
txn->num_rollentries_processed++;
if (txn->num_rollentries_processed % 1024 == 0) {
poll_txn_progress_function(txn, FALSE, FALSE);
}
return r;
}
static inline int
txn_has_current_rollback_log(TOKUTXN txn) {
return txn->current_rollback.b != ROLLBACK_NONE.b;
}
static inline int
txn_has_spilled_rollback_logs(TOKUTXN txn) {
return txn->spilled_rollback_tail.b != ROLLBACK_NONE.b;
}
#include "rollback-ct-callbacks.h"
static void rollback_unpin_remove_callback(CACHEKEY* cachekey, BOOL for_checkpoint, void* extra) {
FT h = extra;
......@@ -67,77 +26,6 @@ void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
assert(r == 0);
}
static int
apply_txn (TOKUTXN txn, LSN lsn,
apply_rollback_item func) {
int r = 0;
// do the commit/abort calls and free everything
// we do the commit/abort calls in reverse order too.
struct roll_entry *item;
//printf("%s:%d abort\n", __FILE__, __LINE__);
BLOCKNUM next_log = ROLLBACK_NONE;
uint32_t next_log_hash = 0;
BOOL is_current = FALSE;
if (txn_has_current_rollback_log(txn)) {
next_log = txn->current_rollback;
next_log_hash = txn->current_rollback_hash;
is_current = TRUE;
}
else if (txn_has_spilled_rollback_logs(txn)) {
next_log = txn->spilled_rollback_tail;
next_log_hash = txn->spilled_rollback_tail_hash;
}
uint64_t last_sequence = txn->num_rollback_nodes;
BOOL found_head = FALSE;
while (next_log.b != ROLLBACK_NONE.b) {
ROLLBACK_LOG_NODE log;
//pin log
toku_get_and_pin_rollback_log(txn, next_log, next_log_hash, &log);
toku_rollback_verify_contents(log, txn->txnid64, last_sequence - 1);
toku_maybe_prefetch_previous_rollback_log(txn, log);
last_sequence = log->sequence;
if (func) {
while ((item=log->newest_logentry)) {
log->newest_logentry = item->prev;
r = func(txn, item, lsn);
if (r!=0) return r;
}
}
if (next_log.b == txn->spilled_rollback_head.b) {
assert(!found_head);
found_head = TRUE;
assert(log->sequence == 0);
}
next_log = log->previous;
next_log_hash = log->previous_hash;
{
//Clean up transaction structure to prevent
//toku_txn_close from double-freeing
if (is_current) {
txn->current_rollback = ROLLBACK_NONE;
txn->current_rollback_hash = 0;
is_current = FALSE;
}
else {
txn->spilled_rollback_tail = next_log;
txn->spilled_rollback_tail_hash = next_log_hash;
}
if (found_head) {
assert(next_log.b == ROLLBACK_NONE.b);
txn->spilled_rollback_head = next_log;
txn->spilled_rollback_head_hash = next_log_hash;
}
}
toku_rollback_log_unpin_and_remove(txn, log);
}
return r;
}
int
toku_find_xid_by_xid (OMTVALUE v, void *xidv) {
TXNID xid = (TXNID) v;
......@@ -166,118 +54,6 @@ void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len)
return r;
}
static int note_ft_used_in_txns_parent(OMTVALUE hv, u_int32_t UU(index), void*txnv) {
TOKUTXN child = txnv;
TOKUTXN parent = child->parent;
FT h = hv;
int r = toku_txn_note_ft(parent, h);
if (r==0 &&
h->txnid_that_created_or_locked_when_empty == toku_txn_get_txnid(child)) {
//Pass magic "no rollback needed" flag to parent.
h->txnid_that_created_or_locked_when_empty = toku_txn_get_txnid(parent);
}
if (r==0 &&
h->txnid_that_suppressed_recovery_logs == toku_txn_get_txnid(child)) {
//Pass magic "no recovery needed" flag to parent.
h->txnid_that_suppressed_recovery_logs = toku_txn_get_txnid(parent);
}
return r;
}
//Commit each entry in the rollback log.
//If the transaction has a parent, it just promotes its information to its parent.
int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
int r=0;
if (txn->parent!=0) {
// First we must put a rollinclude entry into the parent if we spilled
if (txn_has_spilled_rollback_logs(txn)) {
uint64_t num_nodes = txn->num_rollback_nodes;
if (txn_has_current_rollback_log(txn)) {
num_nodes--; //Don't count the in-progress rollback log.
}
r = toku_logger_save_rollback_rollinclude(txn->parent, txn->txnid64, num_nodes,
txn->spilled_rollback_head, txn->spilled_rollback_head_hash,
txn->spilled_rollback_tail, txn->spilled_rollback_tail_hash);
if (r!=0) return r;
//Remove ownership from child.
txn->spilled_rollback_head = ROLLBACK_NONE;
txn->spilled_rollback_head_hash = 0;
txn->spilled_rollback_tail = ROLLBACK_NONE;
txn->spilled_rollback_tail_hash = 0;
}
// if we're commiting a child rollback, put its entries into the parent
// by pinning both child and parent and then linking the child log entry
// list to the end of the parent log entry list.
if (txn_has_current_rollback_log(txn)) {
//Pin parent log
ROLLBACK_LOG_NODE parent_log;
toku_get_and_pin_rollback_log_for_new_entry(txn->parent, &parent_log);
//Pin child log
ROLLBACK_LOG_NODE child_log;
toku_get_and_pin_rollback_log(txn, txn->current_rollback,
txn->current_rollback_hash, &child_log);
toku_rollback_verify_contents(child_log, txn->txnid64, txn->num_rollback_nodes - 1);
// Append the list to the front of the parent.
if (child_log->oldest_logentry) {
// There are some entries, so link them in.
child_log->oldest_logentry->prev = parent_log->newest_logentry;
if (!parent_log->oldest_logentry) {
parent_log->oldest_logentry = child_log->oldest_logentry;
}
parent_log->newest_logentry = child_log->newest_logentry;
parent_log->rollentry_resident_bytecount += child_log->rollentry_resident_bytecount;
txn->parent->rollentry_raw_count += txn->rollentry_raw_count;
child_log->rollentry_resident_bytecount = 0;
}
if (parent_log->oldest_logentry==NULL) {
parent_log->oldest_logentry = child_log->oldest_logentry;
}
child_log->newest_logentry = child_log->oldest_logentry = 0;
// Put all the memarena data into the parent.
if (memarena_total_size_in_use(child_log->rollentry_arena) > 0) {
// If there are no bytes to move, then just leave things alone, and let the memory be reclaimed on txn is closed.
memarena_move_buffers(parent_log->rollentry_arena, child_log->rollentry_arena);
}
toku_rollback_log_unpin_and_remove(txn, child_log);
txn->current_rollback = ROLLBACK_NONE;
txn->current_rollback_hash = 0;
toku_maybe_spill_rollbacks(txn->parent, parent_log);
toku_rollback_log_unpin(txn->parent, parent_log);
assert(r == 0);
}
// Note the open brts, the omts must be merged
r = toku_omt_iterate(txn->open_fts, note_ft_used_in_txns_parent, txn);
assert(r==0);
// Merge the list of headers that must be checkpointed before commit
if (txn->checkpoint_needed_before_commit) {
txn->parent->checkpoint_needed_before_commit = TRUE;
}
//If this transaction needs an fsync (if it commits)
//save that in the parent. Since the commit really happens in the root txn.
txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit;
txn->parent->num_rollentries += txn->num_rollentries;
} else {
r = apply_txn(txn, lsn, toku_commit_rollback_item);
assert(r==0);
}
return r;
}
int toku_rollback_abort(TOKUTXN txn, LSN lsn) {
int r;
r = apply_txn(txn, lsn, toku_abort_rollback_item);
assert(r==0);
return r;
}
static inline PAIR_ATTR make_rollback_pair_attr(long size) {
PAIR_ATTR result={
.size = size,
......@@ -290,133 +66,13 @@ static inline PAIR_ATTR make_rollback_pair_attr(long size) {
return result;
}
// Write something out. Keep trying even if partial writes occur.
// On error: Return negative with errno set.
// On success return nbytes.
static PAIR_ATTR
PAIR_ATTR
rollback_memory_size(ROLLBACK_LOG_NODE log) {
size_t size = sizeof(*log);
size += memarena_total_memory_size(log->rollentry_arena);
return make_rollback_pair_attr(size);
}
// Cleanup the rollback memory
static void
rollback_log_destroy(ROLLBACK_LOG_NODE log) {
memarena_close(&log->rollentry_arena);
toku_free(log);
}
static void rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname,
void *rollback_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size,
BOOL write_me, BOOL keep_me, BOOL for_checkpoint, BOOL UU(is_clone)) {
int r;
ROLLBACK_LOG_NODE log = rollback_v;
FT h = extraargs;
assert(h->cf == cachefile);
assert(log->blocknum.b==logname.b);
if (write_me && !h->panic) {
int n_workitems, n_threads;
toku_cachefile_get_workqueue_load(cachefile, &n_workitems, &n_threads);
r = toku_serialize_rollback_log_to(fd, log->blocknum, log, h, n_workitems, n_threads, for_checkpoint);
if (r) {
if (h->panic==0) {
char *e = strerror(r);
int l = 200 + strlen(e);
char s[l];
h->panic=r;
snprintf(s, l-1, "While writing data to disk, error %d (%s)", r, e);
h->panic_string = toku_strdup(s);
}
}
}
*new_size = size;
if (!keep_me) {
rollback_log_destroy(log);
}
}
static int rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname, u_int32_t fullhash,
void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs) {
int r;
FT h = extraargs;
assert(h->cf == cachefile);
ROLLBACK_LOG_NODE *result = (ROLLBACK_LOG_NODE*)rollback_pv;
r = toku_deserialize_rollback_log_from(fd, logname, fullhash, result, h);
if (r==0) {
*sizep = rollback_memory_size(*result);
}
return r;
}
static void rollback_pe_est_callback(
void* rollback_v,
void* UU(disk_data),
long* bytes_freed_estimate,
enum partial_eviction_cost *cost,
void* UU(write_extraargs)
)
{
assert(rollback_v != NULL);
*bytes_freed_estimate = 0;
*cost = PE_CHEAP;
}
// callback for partially evicting a cachetable entry
static int rollback_pe_callback (
void *rollback_v,
PAIR_ATTR UU(old_attr),
PAIR_ATTR* new_attr,
void* UU(extraargs)
)
{
assert(rollback_v != NULL);
*new_attr = old_attr;
return 0;
}
// partial fetch is never required for a rollback log node
static BOOL rollback_pf_req_callback(void* UU(ftnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
// a rollback node should never be partial fetched,
// because we always say it is not required.
// (pf req callback always returns false)
static int rollback_pf_callback(void* UU(ftnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep)) {
assert(FALSE);
return 0;
}
// the cleaner thread should never choose a rollback node for cleaning
static int rollback_cleaner_callback (
void* UU(ftnode_pv),
BLOCKNUM UU(blocknum),
u_int32_t UU(fullhash),
void* UU(extraargs)
)
{
assert(FALSE);
return 0;
}
static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_rollback_log(FT h) {
CACHETABLE_WRITE_CALLBACK wc;
wc.flush_callback = rollback_flush_callback;
wc.pe_est_callback = rollback_pe_est_callback;
wc.pe_callback = rollback_pe_callback;
wc.cleaner_callback = rollback_cleaner_callback;
wc.clone_callback = NULL;
wc.write_extraargs = h;
return wc;
}
// create and pin a new rollback log node. chain it to the other rollback nodes
// by providing a previous blocknum/ hash and assigning the new rollback log
// node the next sequence number
......@@ -433,7 +89,7 @@ static void rollback_log_create (TOKUTXN txn, BLOCKNUM previous, uint32_t previo
log->layout_version_read_from_disk = FT_LAYOUT_VERSION;
log->dirty = TRUE;
log->txnid = txn->txnid64;
log->sequence = txn->num_rollback_nodes++;
log->sequence = txn->roll_info.num_rollback_nodes++;
toku_allocate_blocknum(h->blocktable, &log->blocknum, h);
log->hash = toku_cachetable_hash(cf, log->blocknum);
log->previous = previous;
......@@ -448,8 +104,8 @@ static void rollback_log_create (TOKUTXN txn, BLOCKNUM previous, uint32_t previo
log, rollback_memory_size(log),
get_write_callbacks_for_rollback_log(h));
assert(r == 0);
txn->current_rollback = log->blocknum;
txn->current_rollback_hash = log->hash;
txn->roll_info.current_rollback = log->blocknum;
txn->roll_info.current_rollback_hash = log->hash;
}
void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
......@@ -466,19 +122,19 @@ void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
// Maybe there is no current after (if it spilled)
void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
if (log->rollentry_resident_bytecount > txn->logger->write_block_size) {
assert(log->blocknum.b == txn->current_rollback.b);
assert(log->blocknum.b == txn->roll_info.current_rollback.b);
//spill
if (!txn_has_spilled_rollback_logs(txn)) {
//First spilled. Copy to head.
txn->spilled_rollback_head = txn->current_rollback;
txn->spilled_rollback_head_hash = txn->current_rollback_hash;
txn->roll_info.spilled_rollback_head = txn->roll_info.current_rollback;
txn->roll_info.spilled_rollback_head_hash = txn->roll_info.current_rollback_hash;
}
//Unconditionally copy to tail. Old tail does not need to be cached anymore.
txn->spilled_rollback_tail = txn->current_rollback;
txn->spilled_rollback_tail_hash = txn->current_rollback_hash;
txn->roll_info.spilled_rollback_tail = txn->roll_info.current_rollback;
txn->roll_info.spilled_rollback_tail_hash = txn->roll_info.current_rollback_hash;
txn->current_rollback = ROLLBACK_NONE;
txn->current_rollback_hash = 0;
txn->roll_info.current_rollback = ROLLBACK_NONE;
txn->roll_info.current_rollback_hash = 0;
}
}
......@@ -493,20 +149,31 @@ static int find_filenum (OMTVALUE v, void *hv) {
}
//Notify a transaction that it has touched a brt.
int toku_txn_note_ft (TOKUTXN txn, FT h) {
BOOL ref_added = toku_ft_maybe_add_txn_ref(h, txn);
// Insert reference to brt into transaction
if (ref_added) {
int r = toku_omt_insert(txn->open_fts, h, find_filenum, h, 0);
assert(r==0);
}
return 0;
void toku_txn_maybe_note_ft (TOKUTXN txn, FT ft) {
toku_txn_lock(txn);
OMTVALUE ftv;
uint32_t idx;
int r = toku_omt_find_zero(txn->open_fts, find_filenum, ft, &ftv, &idx);
if (r == 0) {
// already there
assert((FT) ftv == ft);
goto exit;
}
r = toku_omt_insert_at(txn->open_fts, ft, idx);
assert_zero(r);
// TODO(leif): if there's anything that locks the reflock and then
// the txn lock, this may deadlock, because it grabs the reflock.
toku_ft_add_txn_ref(ft, txn);
exit:
toku_txn_unlock(txn);
}
// Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression)
int toku_logger_txn_rollback_raw_count(TOKUTXN txn, u_int64_t *raw_count)
{
*raw_count = txn->rollentry_raw_count;
toku_txn_lock(txn);
*raw_count = txn->roll_info.rollentry_raw_count;
toku_txn_unlock(txn);
return 0;
}
......@@ -522,9 +189,9 @@ void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE lo
BOOL doing_prefetch = FALSE;
r = toku_cachefile_prefetch(cf, name, hash,
get_write_callbacks_for_rollback_log(h),
rollback_fetch_callback,
rollback_pf_req_callback,
rollback_pf_callback,
toku_rollback_fetch_callback,
toku_rollback_pf_req_callback,
toku_rollback_pf_callback,
h,
&doing_prefetch);
assert(r == 0);
......@@ -545,9 +212,9 @@ void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash
int r = toku_cachetable_get_and_pin(cf, blocknum, hash,
&value, NULL,
get_write_callbacks_for_rollback_log(h),
rollback_fetch_callback,
rollback_pf_req_callback,
rollback_pf_callback,
toku_rollback_fetch_callback,
toku_rollback_pf_req_callback,
toku_rollback_pf_callback,
TRUE, // may_modify_value
h
);
......@@ -561,15 +228,14 @@ void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE
ROLLBACK_LOG_NODE pinned_log;
invariant(txn->state == TOKUTXN_LIVE); // #3258
if (txn_has_current_rollback_log(txn)) {
toku_get_and_pin_rollback_log(txn, txn->current_rollback, txn->current_rollback_hash, &pinned_log);
toku_rollback_verify_contents(pinned_log, txn->txnid64, txn->num_rollback_nodes - 1);
toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, txn->roll_info.current_rollback_hash, &pinned_log);
toku_rollback_verify_contents(pinned_log, txn->txnid64, txn->roll_info.num_rollback_nodes - 1);
} else {
// create a new log for this transaction to use.
// create a new log for this transaction to use.
// this call asserts success internally
rollback_log_create(txn, txn->spilled_rollback_tail, txn->spilled_rollback_tail_hash, &pinned_log);
rollback_log_create(txn, txn->roll_info.spilled_rollback_tail, txn->roll_info.spilled_rollback_tail_hash, &pinned_log);
}
assert(pinned_log->txnid == txn->txnid64);
assert(pinned_log->blocknum.b != ROLLBACK_NONE.b);
assert(pinned_log->blocknum.b != ROLLBACK_NONE.b);
*log = pinned_log;
}
......@@ -14,8 +14,6 @@ extern "C" {
#endif
void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint);
int toku_rollback_commit(TOKUTXN txn, LSN lsn);
int toku_rollback_abort(TOKUTXN txn, LSN lsn);
// these functions assert internally that they succeed
......@@ -38,10 +36,6 @@ void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE lo
// unpin and rmove a rollback log from the cachetable
void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log);
typedef int(*apply_rollback_item)(TOKUTXN txn, struct roll_entry *item, LSN lsn);
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn);
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn);
void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size);
void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len);
......@@ -56,12 +50,14 @@ void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len);
// if necessary.
void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log);
int toku_txn_note_ft (TOKUTXN txn, FT h);
void toku_txn_maybe_note_ft (TOKUTXN txn, FT h);
int toku_logger_txn_rollback_raw_count(TOKUTXN txn, u_int64_t *raw_count);
int toku_find_pair_by_xid (OMTVALUE v, void *txnv);
int toku_find_xid_by_xid (OMTVALUE v, void *xidv);
PAIR_ATTR rollback_memory_size(ROLLBACK_LOG_NODE log);
// A high-level rollback log is made up of a chain of rollback log nodes.
// Each rollback log node is represented (separately) in the cachetable by
// this structure. Each portion of the rollback log chain has a block num
......
......@@ -10,6 +10,7 @@
#include "checkpoint.h"
#include "ule.h"
#include <valgrind/helgrind.h>
#include "rollback-apply.h"
#include "txn_manager.h"
///////////////////////////////////////////////////////////////////////////////////
......@@ -50,6 +51,18 @@ toku_txn_get_status(TXN_STATUS s) {
*s = txn_status;
}
void
toku_txn_lock(TOKUTXN txn)
{
toku_mutex_lock(&txn->txn_lock);
}
void
toku_txn_unlock(TOKUTXN txn)
{
toku_mutex_unlock(&txn->txn_lock);
}
int
toku_txn_begin_txn (
DB_TXN *container_db_txn,
......@@ -59,7 +72,7 @@ toku_txn_begin_txn (
TXN_SNAPSHOT_TYPE snapshot_type
)
{
int r = toku_txn_begin_with_xid(parent_tokutxn, tokutxn, logger, TXNID_NONE, snapshot_type, container_db_txn);
int r = toku_txn_begin_with_xid(parent_tokutxn, tokutxn, logger, TXNID_NONE, snapshot_type, container_db_txn, false);
return r;
}
......@@ -70,14 +83,11 @@ toku_txn_begin_with_xid (
TOKULOGGER logger,
TXNID xid,
TXN_SNAPSHOT_TYPE snapshot_type,
DB_TXN *container_db_txn
DB_TXN *container_db_txn,
bool for_recovery
)
{
int r = toku_txn_create_txn(tokutxn, parent_tokutxn, logger, xid, snapshot_type, container_db_txn);
if (r == 0) {
toku_txn_manager_start_txn((*tokutxn)->logger->txn_manager, *tokutxn);
}
return r;
return toku_txn_manager_start_txn(tokutxn, logger->txn_manager, parent_tokutxn, logger, xid, snapshot_type, container_db_txn, for_recovery);
}
DB_TXN *
......@@ -95,52 +105,78 @@ static void invalidate_xa_xid (TOKU_XA_XID *xid) {
xid->formatID = -1; // According to the XA spec, -1 means "invalid data"
}
int
int
toku_txn_create_txn (
TOKUTXN *tokutxn,
TOKUTXN parent_tokutxn,
TOKULOGGER logger,
TXNID xid,
TXN_SNAPSHOT_TYPE snapshot_type,
DB_TXN *container_db_txn
XIDS xids,
DB_TXN *container_db_txn,
bool for_checkpoint
)
{
if (logger->is_panicked) return EINVAL;
if (logger->is_panicked) {
return EINVAL;
}
assert(logger->rollback_cachefile);
TOKUTXN XMALLOC(result);
result->starttime = time(NULL); // getting timestamp in seconds is a cheap call
int r;
r = toku_omt_create(&result->open_fts);
assert_zero(r);
result->logger = logger;
result->parent = parent_tokutxn;
result->num_rollentries = 0;
result->num_rollentries_processed = 0;
result->progress_poll_fun = NULL;
result->progress_poll_fun_extra = NULL;
result->spilled_rollback_head = ROLLBACK_NONE;
result->spilled_rollback_tail = ROLLBACK_NONE;
result->spilled_rollback_head_hash = 0;
result->spilled_rollback_tail_hash = 0;
result->current_rollback = ROLLBACK_NONE;
result->current_rollback_hash = 0;
result->num_rollback_nodes = 0;
result->snapshot_type = snapshot_type;
result->snapshot_txnid64 = TXNID_NONE;
result->container_db_txn = container_db_txn;
result->rollentry_raw_count = 0;
result->force_fsync_on_commit = FALSE;
result->recovered_from_checkpoint = FALSE;
result->checkpoint_needed_before_commit = FALSE;
result->state = TOKUTXN_LIVE;
invalidate_xa_xid(&result->xa_xid);
result->do_fsync = FALSE;
TXNID snapshot_txnid64;
if (snapshot_type == TXN_SNAPSHOT_NONE) {
snapshot_txnid64 = TXNID_NONE;
} else if (parent_tokutxn == NULL || snapshot_type == TXN_SNAPSHOT_CHILD) {
snapshot_txnid64 = xid;
} else if (snapshot_type == TXN_SNAPSHOT_ROOT) {
snapshot_txnid64 = parent_tokutxn->snapshot_txnid64;
} else {
assert(false);
}
result->txnid64 = xid;
result->xids = NULL;
OMT open_fts;
{
int r = toku_omt_create(&open_fts);
assert_zero(r);
}
struct txn_roll_info roll_info = {
.num_rollback_nodes = 0,
.num_rollentries = 0,
.num_rollentries_processed = 0,
.rollentry_raw_count = 0,
.spilled_rollback_head = ROLLBACK_NONE,
.spilled_rollback_tail = ROLLBACK_NONE,
.spilled_rollback_head_hash = 0,
.spilled_rollback_tail_hash = 0,
.current_rollback = ROLLBACK_NONE,
.current_rollback_hash = 0,
};
struct tokutxn new_txn = {
.starttime = time(NULL),
.open_fts = open_fts,
.logger = logger,
.parent = parent_tokutxn,
.progress_poll_fun = NULL,
.progress_poll_fun_extra = NULL,
.snapshot_type = snapshot_type,
.snapshot_txnid64 = snapshot_txnid64,
.container_db_txn = container_db_txn,
.force_fsync_on_commit = FALSE,
.recovered_from_checkpoint = for_checkpoint,
.checkpoint_needed_before_commit = FALSE,
.state = TOKUTXN_LIVE,
.do_fsync = FALSE,
.txnid64 = xid,
.ancestor_txnid64 = (parent_tokutxn ? parent_tokutxn->ancestor_txnid64 : xid),
.xids = xids,
.roll_info = roll_info
};
TOKUTXN result = toku_xmemdup(&new_txn, sizeof new_txn);
toku_mutex_init(&result->txn_lock, NULL);
invalidate_xa_xid(&result->xa_xid);
*tokutxn = result;
STATUS_VALUE(TXN_BEGIN)++;
......@@ -154,31 +190,27 @@ toku_txn_create_txn (
//Used on recovery to recover a transaction.
int
toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) {
#define COPY_FROM_INFO(field) txn->field = info->field
COPY_FROM_INFO(rollentry_raw_count);
txn->roll_info.rollentry_raw_count = info->rollentry_raw_count;
uint32_t i;
for (i = 0; i < info->num_fts; i++) {
FT h = info->open_fts[i];
int r = toku_txn_note_ft(txn, h);
assert_zero(r);
FT ft = info->open_fts[i];
toku_txn_maybe_note_ft(txn, ft);
}
COPY_FROM_INFO(force_fsync_on_commit );
COPY_FROM_INFO(num_rollback_nodes);
COPY_FROM_INFO(num_rollentries);
txn->force_fsync_on_commit = info->force_fsync_on_commit;
txn->roll_info.num_rollback_nodes = info->num_rollback_nodes;
txn->roll_info.num_rollentries = info->num_rollentries;
CACHEFILE rollback_cachefile = txn->logger->rollback_cachefile;
COPY_FROM_INFO(spilled_rollback_head);
txn->spilled_rollback_head_hash = toku_cachetable_hash(rollback_cachefile,
txn->spilled_rollback_head);
COPY_FROM_INFO(spilled_rollback_tail);
txn->spilled_rollback_tail_hash = toku_cachetable_hash(rollback_cachefile,
txn->spilled_rollback_tail);
COPY_FROM_INFO(current_rollback);
txn->current_rollback_hash = toku_cachetable_hash(rollback_cachefile,
txn->current_rollback);
#undef COPY_FROM_INFO
txn->recovered_from_checkpoint = TRUE;
txn->roll_info.spilled_rollback_head = info->spilled_rollback_head;
txn->roll_info.spilled_rollback_head_hash = toku_cachetable_hash(rollback_cachefile,
txn->roll_info.spilled_rollback_head);
txn->roll_info.spilled_rollback_tail = info->spilled_rollback_tail;
txn->roll_info.spilled_rollback_tail_hash = toku_cachetable_hash(rollback_cachefile,
txn->roll_info.spilled_rollback_tail);
txn->roll_info.current_rollback = info->current_rollback;
txn->roll_info.current_rollback_hash = toku_cachetable_hash(rollback_cachefile,
txn->roll_info.current_rollback);
return 0;
}
......@@ -223,7 +255,7 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn,
// recovery to properly recommit this transaction if the commit
// does not make it to disk. In the case of MySQL, that would be the
// binary log.
txn->do_fsync = !txn->parent && (txn->force_fsync_on_commit || (!nosync && txn->num_rollentries>0));
txn->do_fsync = !txn->parent && (txn->force_fsync_on_commit || (!nosync && txn->roll_info.num_rollentries>0));
txn->progress_poll_fun = poll;
txn->progress_poll_fun_extra = poll_extra;
......@@ -274,7 +306,7 @@ int toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xa_xid) {
if (txn->parent) return 0; // nothing to do if there's a parent.
toku_txn_manager_add_prepared_txn(txn->logger->txn_manager, txn);
// Do we need to do an fsync?
txn->do_fsync = (txn->force_fsync_on_commit || txn->num_rollentries>0);
txn->do_fsync = (txn->force_fsync_on_commit || txn->roll_info.num_rollentries>0);
copy_xid(&txn->xa_xid, xa_xid);
// This list will go away with #4683, so we wn't need the ydb lock for this anymore.
return toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64, xa_xid);
......@@ -350,9 +382,9 @@ static void note_txn_closing (TOKUTXN txn) {
}
void toku_txn_complete_txn(TOKUTXN txn) {
assert(txn->spilled_rollback_head.b == ROLLBACK_NONE.b);
assert(txn->spilled_rollback_tail.b == ROLLBACK_NONE.b);
assert(txn->current_rollback.b == ROLLBACK_NONE.b);
assert(txn->roll_info.spilled_rollback_head.b == ROLLBACK_NONE.b);
assert(txn->roll_info.spilled_rollback_tail.b == ROLLBACK_NONE.b);
assert(txn->roll_info.current_rollback.b == ROLLBACK_NONE.b);
toku_txn_manager_finish_txn(txn->logger->txn_manager, txn);
// note that here is another place we depend on
// this function being called with the multi operation lock
......@@ -364,6 +396,7 @@ void toku_txn_destroy_txn(TOKUTXN txn) {
toku_omt_destroy(&txn->open_fts);
}
xids_destroy(&txn->xids);
toku_mutex_destroy(&txn->txn_lock);
toku_free(txn);
STATUS_VALUE(TXN_CLOSE)++;
......
......@@ -11,6 +11,9 @@
extern "C" {
#endif
void toku_txn_lock(TOKUTXN txn);
void toku_txn_unlock(TOKUTXN txn);
int toku_txn_begin_txn (
DB_TXN *container_db_txn,
TOKUTXN parent_tokutxn,
......@@ -29,11 +32,12 @@ int toku_txn_begin_with_xid (
TOKULOGGER logger,
TXNID xid,
TXN_SNAPSHOT_TYPE snapshot_type,
DB_TXN *container_db_txn
DB_TXN *container_db_txn,
bool for_recovery
);
// Allocate and initialize a txn
int toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXNID xid, TXN_SNAPSHOT_TYPE snapshot_type, DB_TXN *container_db_txn);
int toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXNID xid, TXN_SNAPSHOT_TYPE snapshot_type, XIDS xids, DB_TXN *container_db_txn, bool for_checkpoint);
int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info);
......
......@@ -305,8 +305,16 @@ live_list_reverse_note_txn_start(TOKUTXN txn) {
return r;
}
void toku_txn_manager_start_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
TOKUTXN parent = txn->parent;
int toku_txn_manager_start_txn(
TOKUTXN *txnp,
TXN_MANAGER txn_manager,
TOKUTXN parent,
TOKULOGGER logger,
TXNID xid,
TXN_SNAPSHOT_TYPE snapshot_type,
DB_TXN *container_db_txn,
bool for_recovery)
{
int r;
// we take the txn_manager_lock before writing to the log
// we may be able to move this lock acquisition
......@@ -316,20 +324,28 @@ void toku_txn_manager_start_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
if (garbage_collection_debug) {
verify_snapshot_system(txn_manager);
}
if (txn->txnid64 == TXNID_NONE) {
if (xid == TXNID_NONE) {
LSN first_lsn;
r = toku_log_xbegin(txn->logger, &first_lsn, 0, parent ? parent->txnid64 : 0);
r = toku_log_xbegin(logger, &first_lsn, 0, parent ? parent->txnid64 : 0);
assert_zero(r);
txn->txnid64 = first_lsn.lsn;
xid = first_lsn.lsn;
}
XIDS parent_xids;
if (parent == NULL)
parent_xids = xids_get_root_xids();
else
parent_xids = parent->xids;
r = xids_create_child(parent_xids, &txn->xids, txn->txnid64);
XIDS xids;
r = xids_create_child(parent_xids, &xids, xid);
assert_zero(r);
TOKUTXN txn;
r = toku_txn_create_txn(&txn, parent, logger, xid, snapshot_type, xids, container_db_txn, for_recovery);
if (r != 0) {
// logger is panicked
return r;
}
if (toku_omt_size(txn_manager->live_txns) == 0) {
assert(txn_manager->oldest_living_xid == TXNID_NONE_LIVING);
txn_manager->oldest_living_xid = txn->txnid64;
......@@ -367,10 +383,6 @@ void toku_txn_manager_start_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
toku_omt_size(txn_manager->live_root_txns)
); //We know it is the newest one.
assert_zero(r);
txn->ancestor_txnid64 = txn->txnid64;
}
else {
txn->ancestor_txnid64 = parent->ancestor_txnid64;
}
// setup information for snapshot reads
......@@ -380,7 +392,6 @@ void toku_txn_manager_start_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
if (parent == NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD) {
r = setup_live_root_txn_list(txn_manager, txn);
assert_zero(r);
txn->snapshot_txnid64 = txn->txnid64;
r = snapshot_txnids_note_txn(txn_manager, txn);
assert_zero(r);
r = live_list_reverse_note_txn_start(txn);
......@@ -390,7 +401,6 @@ void toku_txn_manager_start_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
// of the root transaction
else if (txn->snapshot_type == TXN_SNAPSHOT_ROOT) {
txn->live_root_txn_list = parent->live_root_txn_list;
txn->snapshot_txnid64 = parent->snapshot_txnid64;
}
else {
assert(FALSE);
......@@ -401,6 +411,9 @@ void toku_txn_manager_start_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
verify_snapshot_system(txn_manager);
}
toku_mutex_unlock(&txn_manager->txn_manager_lock);
*txnp = txn;
return 0;
}
// For each xid on the closing txn's live list, find the corresponding entry in the reverse live list.
......
......@@ -34,7 +34,16 @@ void toku_txn_manager_destroy(TXN_MANAGER txn_manager);
TXNID toku_txn_manager_get_oldest_living_xid(TXN_MANAGER txn_manager, time_t * oldest_living_starttime);
// Assign a txnid. Log the txn begin in the recovery log. Initialize the txn live lists.
void toku_txn_manager_start_txn(TXN_MANAGER txn_manager, TOKUTXN txn);
// Also, create the txn.
int toku_txn_manager_start_txn(
TOKUTXN *txnp,
TXN_MANAGER txn_manager,
TOKUTXN parent,
TOKULOGGER logger,
TXNID xid,
TXN_SNAPSHOT_TYPE snapshot_type,
DB_TXN *container_db_txn,
bool for_recovery);
void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn);
void toku_txn_manager_clone_state_for_gc(
......
......@@ -520,7 +520,7 @@ maybe_upgrade_persistent_environment_dictionary(DB_ENV * env, DB_TXN * txn, LSN
const uint32_t curr_env_ver_d = toku_htod32(FT_LAYOUT_VERSION);
toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
toku_fill_dbt(&val, &curr_env_ver_d, sizeof(curr_env_ver_d));
r = toku_db_put(persistent_environment, txn, &key, &val, 0, TRUE);
r = toku_db_put(persistent_environment, txn, &key, &val, 0, FALSE);
assert_zero(r);
// although the variable name is last_lsn_of_v13, this key really represents
......@@ -529,19 +529,19 @@ maybe_upgrade_persistent_environment_dictionary(DB_ENV * env, DB_TXN * txn, LSN
uint64_t last_lsn_of_v13_d = toku_htod64(last_lsn_of_clean_shutdown_read_from_log.lsn);
toku_fill_dbt(&key, last_lsn_of_v13_key, strlen(last_lsn_of_v13_key));
toku_fill_dbt(&val, &last_lsn_of_v13_d, sizeof(last_lsn_of_v13_d));
r = toku_db_put(persistent_environment, txn, &key, &val, 0, TRUE);
r = toku_db_put(persistent_environment, txn, &key, &val, 0, FALSE);
assert_zero(r);
time_t upgrade_v19_time_d = toku_htod64(time(NULL));
toku_fill_dbt(&key, upgrade_v19_time_key, strlen(upgrade_v19_time_key));
toku_fill_dbt(&val, &upgrade_v19_time_d, sizeof(upgrade_v19_time_d));
r = toku_db_put(persistent_environment, txn, &key, &val, DB_NOOVERWRITE, TRUE);
r = toku_db_put(persistent_environment, txn, &key, &val, DB_NOOVERWRITE, FALSE);
assert_zero(r);
uint64_t upgrade_v19_footprint_d = toku_htod64(toku_log_upgrade_get_footprint());
toku_fill_dbt(&key, upgrade_v19_footprint_key, strlen(upgrade_v19_footprint_key));
toku_fill_dbt(&val, &upgrade_v19_footprint_d, sizeof(upgrade_v19_footprint_d));
r = toku_db_put(persistent_environment, txn, &key, &val, DB_NOOVERWRITE, TRUE);
r = toku_db_put(persistent_environment, txn, &key, &val, DB_NOOVERWRITE, FALSE);
assert_zero(r);
}
return r;
......@@ -959,18 +959,18 @@ env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {
toku_fill_dbt(&key, orig_env_ver_key, strlen(orig_env_ver_key));
toku_fill_dbt(&val, &environment_version, sizeof(environment_version));
r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, TRUE);
r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, FALSE);
assert_zero(r);
toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
toku_fill_dbt(&val, &environment_version, sizeof(environment_version));
r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, TRUE);
r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, FALSE);
assert_zero(r);
time_t creation_time_d = toku_htod64(time(NULL));
toku_fill_dbt(&key, creation_time_key, strlen(creation_time_key));
toku_fill_dbt(&val, &creation_time_d, sizeof(creation_time_d));
r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, TRUE);
r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, FALSE);
assert_zero(r);
}
else {
......
......@@ -488,21 +488,18 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
break;
}
}
r = toku_txn_create_txn(&db_txn_struct_i(result)->tokutxn,
stxn ? db_txn_struct_i(stxn)->tokutxn : 0,
env->i->logger,
TXNID_NONE,
snapshot_type,
result
);
r = toku_txn_manager_start_txn(&db_txn_struct_i(result)->tokutxn,
toku_logger_get_txn_manager(env->i->logger),
stxn ? db_txn_struct_i(stxn)->tokutxn : 0,
env->i->logger,
TXNID_NONE,
snapshot_type,
result,
false);
if (r != 0) {
toku_free(result);
return r;
}
toku_txn_manager_start_txn(
toku_logger_get_txn_manager(env->i->logger),
db_txn_struct_i(result)->tokutxn
);
//Add to the list of children for the parent.
if (result->parent) {
......
......@@ -8,6 +8,7 @@
#include "ydb-internal.h"
#include "indexer.h"
#include <ft/log_header.h>
#include <ft/checkpoint.h>
#include "ydb_row_lock.h"
#include "ydb_write.h"
#include "ydb_db.h"
......@@ -125,7 +126,7 @@ db_put_check_overwrite_constraint(DB *db, DB_TXN *txn, DBT *key,
int
toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags, BOOL holds_ydb_lock) {
toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags, BOOL holds_mo_lock) {
HANDLE_PANICKED_DB(db);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
......@@ -152,9 +153,9 @@ toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags, BOOL holds_ydb_lock)
}
if (r == 0) {
//Do the actual deleting.
if (!holds_ydb_lock) toku_ydb_lock();
if (!holds_mo_lock) toku_multi_operation_client_lock();
r = toku_ft_delete(db->i->ft_handle, key, txn ? db_txn_struct_i(txn)->tokutxn : 0);
if (!holds_ydb_lock) toku_ydb_unlock();
if (!holds_mo_lock) toku_multi_operation_client_unlock();
}
if (r == 0) {
......@@ -168,7 +169,7 @@ toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags, BOOL holds_ydb_lock)
int
toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags, BOOL holds_ydb_lock) {
toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags, BOOL holds_mo_lock) {
HANDLE_PANICKED_DB(db);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
int r = 0;
......@@ -193,9 +194,9 @@ toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags, BOOL holds
if (flags==DB_NOOVERWRITE_NO_ERROR) {
type = FT_INSERT_NO_OVERWRITE;
}
if (!holds_ydb_lock) toku_ydb_lock();
if (!holds_mo_lock) toku_multi_operation_client_lock();
r = toku_ft_maybe_insert(db->i->ft_handle, key, val, ttxn, FALSE, ZERO_LSN, TRUE, type);
if (!holds_ydb_lock) toku_ydb_unlock();
if (!holds_mo_lock) toku_multi_operation_client_unlock();
}
if (r == 0) {
......@@ -232,10 +233,10 @@ toku_db_update(DB *db, DB_TXN *txn,
}
TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
toku_ydb_lock();
toku_multi_operation_client_lock();
r = toku_ft_maybe_update(db->i->ft_handle, key, update_function_extra, ttxn,
FALSE, ZERO_LSN, TRUE);
toku_ydb_unlock();
toku_multi_operation_client_unlock();
cleanup:
if (r == 0)
......@@ -287,10 +288,10 @@ toku_db_update_broadcast(DB *db, DB_TXN *txn,
}
TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
toku_ydb_lock();
toku_multi_operation_client_lock();
r = toku_ft_maybe_update_broadcast(db->i->ft_handle, update_function_extra, ttxn,
FALSE, ZERO_LSN, TRUE, is_resetting_op);
toku_ydb_unlock();
toku_multi_operation_client_unlock();
cleanup:
if (r == 0)
......
......@@ -35,8 +35,8 @@ typedef struct {
void ydb_write_layer_get_status(YDB_WRITE_LAYER_STATUS statp);
int toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags, BOOL holds_ydb_lock);
int toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags, BOOL holds_ydb_lock);
int toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags, BOOL holds_mo_lock);
int toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags, BOOL holds_mo_lock);
int autotxn_db_del(DB* db, DB_TXN* txn, DBT* key, u_int32_t flags);
int autotxn_db_put(DB* db, DB_TXN* txn, DBT* key, DBT* data, u_int32_t flags);
int autotxn_db_update(DB *db, DB_TXN *txn, const DBT *key, const DBT *update_function_extra, u_int32_t flags);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment