Commit 3608dcd1 authored by Yoni Fogel's avatar Yoni Fogel

closes #5149 Skips taking MO lock for readonly transactions on

commit, abort, and prepare

git-svn-id: file:///svn/toku/tokudb@44989 c7de825b-a66e-492c-adef-691d508d4ae1
parent 98d239de
...@@ -287,18 +287,12 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn, ...@@ -287,18 +287,12 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn,
txn->progress_poll_fun = poll; txn->progress_poll_fun = poll;
txn->progress_poll_fun_extra = poll_extra; txn->progress_poll_fun_extra = poll_extra;
if (txn->begin_was_logged) { if (!toku_txn_is_read_only(txn)) {
r = toku_log_xcommit(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid64); r = toku_log_xcommit(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid64);
if (r != 0) { if (r != 0) {
goto cleanup; goto cleanup;
} }
} }
else {
// Did no work.
invariant(txn->roll_info.num_rollentries == 0);
// Was not prepared.
invariant(txn->do_fsync_lsn.lsn == ZERO_LSN.lsn);
}
// If !txn->begin_was_logged, we could skip toku_rollback_commit // If !txn->begin_was_logged, we could skip toku_rollback_commit
// but it's cheap (only a number of function calls that return immediately) // but it's cheap (only a number of function calls that return immediately)
// since there were no writes. Skipping it would mean we would need to be careful // since there were no writes. Skipping it would mean we would need to be careful
...@@ -328,18 +322,12 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, LSN oplsn, ...@@ -328,18 +322,12 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, LSN oplsn,
int r; int r;
txn->do_fsync = FALSE; txn->do_fsync = FALSE;
if (txn->begin_was_logged) { if (!toku_txn_is_read_only(txn)) {
r = toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid64); r = toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid64);
if (r != 0) { if (r != 0) {
goto cleanup; goto cleanup;
} }
} }
else {
// Did no work.
invariant(txn->roll_info.num_rollentries == 0);
// Was not prepared.
invariant(txn->do_fsync_lsn.lsn == ZERO_LSN.lsn);
}
// If !txn->begin_was_logged, we could skip toku_rollback_abort // If !txn->begin_was_logged, we could skip toku_rollback_abort
// but it's cheap (only a number of function calls that return immediately) // but it's cheap (only a number of function calls that return immediately)
// since there were no writes. Skipping it would mean we would need to be careful // since there were no writes. Skipping it would mean we would need to be careful
...@@ -360,7 +348,7 @@ static void copy_xid (TOKU_XA_XID *dest, TOKU_XA_XID *source) { ...@@ -360,7 +348,7 @@ static void copy_xid (TOKU_XA_XID *dest, TOKU_XA_XID *source) {
int toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xa_xid) { int toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xa_xid) {
int r = 0; int r = 0;
if (txn->parent || !txn->begin_was_logged) { if (txn->parent || toku_txn_is_read_only(txn)) {
// nothing to do if there's a parent, or if it's read-only // nothing to do if there's a parent, or if it's read-only
goto cleanup; goto cleanup;
} }
...@@ -526,6 +514,19 @@ toku_maybe_log_begin_txn_for_write_operation(TOKUTXN txn) { ...@@ -526,6 +514,19 @@ toku_maybe_log_begin_txn_for_write_operation(TOKUTXN txn) {
toku_txn_unlock(txn); toku_txn_unlock(txn);
} }
bool
toku_txn_is_read_only(TOKUTXN txn) {
if (!txn->begin_was_logged) {
// Did no work.
invariant(txn->roll_info.num_rollentries == 0);
// Was not prepared.
invariant(txn->do_fsync_lsn.lsn == ZERO_LSN.lsn);
invariant(toku_omt_size(txn->open_fts) == 0);
return true;
}
return false;
}
#include <valgrind/helgrind.h> #include <valgrind/helgrind.h>
void __attribute__((__constructor__)) toku_txn_status_helgrind_ignore(void); void __attribute__((__constructor__)) toku_txn_status_helgrind_ignore(void);
void void
......
...@@ -114,6 +114,7 @@ struct tokulogger_preplist { ...@@ -114,6 +114,7 @@ struct tokulogger_preplist {
int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags); int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags);
void toku_maybe_log_begin_txn_for_write_operation(TOKUTXN txn); void toku_maybe_log_begin_txn_for_write_operation(TOKUTXN txn);
bool toku_txn_is_read_only(TOKUTXN txn);
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus) || defined(__cilkplusplus)
} }
......
...@@ -14,8 +14,9 @@ ...@@ -14,8 +14,9 @@
#include "ft/txn_manager.h" #include "ft/txn_manager.h"
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags, TXN_PROGRESS_POLL_FUNCTION poll, static int toku_txn_commit(DB_TXN * txn, u_int32_t flags, TXN_PROGRESS_POLL_FUNCTION poll,
void *poll_extra, bool release_multi_operation_client_lock); void *poll_extra, bool release_mo_client_lock_if_held, bool *holds_mo_lock);
static int toku_txn_abort(DB_TXN * txn, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); static int toku_txn_abort(DB_TXN * txn, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
bool *holds_mo_lock);
static int static int
toku_txn_release_locks(DB_TXN* txn) { toku_txn_release_locks(DB_TXN* txn) {
...@@ -57,14 +58,22 @@ toku_txn_destroy(DB_TXN *txn) { ...@@ -57,14 +58,22 @@ toku_txn_destroy(DB_TXN *txn) {
} }
static int static int
toku_txn_commit_only(DB_TXN * txn, u_int32_t flags, toku_txn_commit(DB_TXN * txn, u_int32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
bool release_multi_operation_client_lock) { bool release_mo_lock_if_held,
bool *holds_mo_lock) {
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
// Don't take multi-operation lock until we see a non-readonly txn.
if (!*holds_mo_lock &&
!toku_txn_is_read_only(db_txn_struct_i(txn)->tokutxn)) {
toku_multi_operation_client_lock();
*holds_mo_lock = true;
}
//Recursively kill off children //Recursively kill off children
if (db_txn_struct_i(txn)->child) { if (db_txn_struct_i(txn)->child) {
//commit of child sets the child pointer to NULL //commit of child sets the child pointer to NULL
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, flags, NULL, NULL, false); int r_child = toku_txn_commit(db_txn_struct_i(txn)->child,
flags, NULL, NULL, false, holds_mo_lock);
if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) { if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) {
env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent commit.\n"); env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent commit.\n");
} }
...@@ -116,19 +125,15 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags, ...@@ -116,19 +125,15 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags,
// this lock must be held until the references to the open FTs is released // this lock must be held until the references to the open FTs is released
// begin checkpoint logs these associations, so we must be protect // begin checkpoint logs these associations, so we must be protect
// the changing of these associations with checkpointing // the changing of these associations with checkpointing
if (release_multi_operation_client_lock) { if (release_mo_lock_if_held && *holds_mo_lock) {
toku_multi_operation_client_unlock(); toku_multi_operation_client_unlock();
} }
toku_txn_maybe_fsync_log(logger, do_fsync_lsn, do_fsync); toku_txn_maybe_fsync_log(logger, do_fsync_lsn, do_fsync);
if (flags!=0) return EINVAL; if (flags!=0) {
return r; r = EINVAL;
} goto cleanup;
}
static int cleanup:
toku_txn_commit(DB_TXN * txn, u_int32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
bool release_multi_operation_client_lock) {
int r = toku_txn_commit_only(txn, flags, poll, poll_extra, release_multi_operation_client_lock);
toku_txn_destroy(txn); toku_txn_destroy(txn);
return r; return r;
} }
...@@ -148,13 +153,21 @@ toku_txn_id64(DB_TXN * txn) { ...@@ -148,13 +153,21 @@ toku_txn_id64(DB_TXN * txn) {
} }
static int static int
toku_txn_abort_only(DB_TXN * txn, toku_txn_abort(DB_TXN * txn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
bool *holds_mo_lock) {
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
// Don't take multi-operation lock until we see a non-readonly txn.
if (!*holds_mo_lock &&
!toku_txn_is_read_only(db_txn_struct_i(txn)->tokutxn)) {
toku_multi_operation_client_lock();
*holds_mo_lock = true;
}
//Recursively kill off children (abort or commit are both correct, commit is cheaper) //Recursively kill off children (abort or commit are both correct, commit is cheaper)
if (db_txn_struct_i(txn)->child) { if (db_txn_struct_i(txn)->child) {
//commit of child sets the child pointer to NULL //commit of child sets the child pointer to NULL
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, DB_TXN_NOSYNC, NULL, NULL, false); int r_child = toku_txn_commit(db_txn_struct_i(txn)->child,
DB_TXN_NOSYNC, NULL, NULL, false, holds_mo_lock);
if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) { if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) {
env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent abort.\n"); env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent abort.\n");
} }
...@@ -176,29 +189,34 @@ toku_txn_abort_only(DB_TXN * txn, ...@@ -176,29 +189,34 @@ toku_txn_abort_only(DB_TXN * txn,
assert_zero(r); assert_zero(r);
toku_txn_complete_txn(db_txn_struct_i(txn)->tokutxn); toku_txn_complete_txn(db_txn_struct_i(txn)->tokutxn);
r = toku_txn_release_locks(txn); r = toku_txn_release_locks(txn);
toku_txn_destroy(txn);
return r; return r;
} }
// requires: must hold the multi operation lock. it is
// released here before the fsync.
static int static int
toku_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) { toku_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) {
int r = 0; int r = 0;
if (!txn) { if (!txn) {
toku_multi_operation_client_unlock();
r = EINVAL; r = EINVAL;
goto exit; goto exit;
} }
if (txn->parent) { if (txn->parent) {
toku_multi_operation_client_unlock();
r = 0; // make this a NO-OP, MySQL calls this r = 0; // make this a NO-OP, MySQL calls this
goto exit; goto exit;
} }
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
// Take the mo lock as soon as a non-readonly txn is found
bool holds_mo_lock = false;
if (!toku_txn_is_read_only(db_txn_struct_i(txn)->tokutxn)) {
toku_multi_operation_client_lock();
holds_mo_lock = true;
}
//Recursively commit any children. //Recursively commit any children.
if (db_txn_struct_i(txn)->child) { if (db_txn_struct_i(txn)->child) {
//commit of child sets the child pointer to NULL //commit of child sets the child pointer to NULL
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, 0, NULL, NULL, false);
// toku_txn_commit will take the mo_lock if not held and a non-readonly txn is found.
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, 0, NULL, NULL, false, &holds_mo_lock);
if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) { if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) {
env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent commit.\n"); env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent commit.\n");
} }
...@@ -213,7 +231,9 @@ toku_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) { ...@@ -213,7 +231,9 @@ toku_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) {
bool do_fsync; bool do_fsync;
toku_txn_get_fsync_info(ttxn, &do_fsync, &do_fsync_lsn); toku_txn_get_fsync_info(ttxn, &do_fsync, &do_fsync_lsn);
// release the multi operation lock before fsyncing the log // release the multi operation lock before fsyncing the log
if (holds_mo_lock) {
toku_multi_operation_client_unlock(); toku_multi_operation_client_unlock();
}
toku_txn_maybe_fsync_log(logger, do_fsync_lsn, do_fsync); toku_txn_maybe_fsync_log(logger, do_fsync_lsn, do_fsync);
exit: exit:
return r; return r;
...@@ -232,14 +252,6 @@ toku_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) { ...@@ -232,14 +252,6 @@ toku_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) {
return toku_txn_xa_prepare(txn, &xid); return toku_txn_xa_prepare(txn, &xid);
} }
static int
toku_txn_abort(DB_TXN * txn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) {
int r = toku_txn_abort_only(txn, poll, poll_extra);
toku_txn_destroy(txn);
return r;
}
static u_int32_t static u_int32_t
locked_txn_id(DB_TXN *txn) { locked_txn_id(DB_TXN *txn) {
u_int32_t r = toku_txn_id(txn); u_int32_t r = toku_txn_id(txn);
...@@ -265,11 +277,13 @@ locked_txn_commit_with_progress(DB_TXN *txn, u_int32_t flags, ...@@ -265,11 +277,13 @@ locked_txn_commit_with_progress(DB_TXN *txn, u_int32_t flags,
if (toku_txn_requires_checkpoint(ttxn)) { if (toku_txn_requires_checkpoint(ttxn)) {
toku_checkpoint(txn->mgrp->i->cachetable, txn->mgrp->i->logger, NULL, NULL, NULL, NULL, TXN_COMMIT_CHECKPOINT); toku_checkpoint(txn->mgrp->i->cachetable, txn->mgrp->i->logger, NULL, NULL, NULL, NULL, TXN_COMMIT_CHECKPOINT);
} }
// cannot begin a checkpoint. the multi operation lock is taken here, bool holds_mo_lock = false;
// but released in toku_txn_commit_only. this way, we don't hold it // cannot begin a checkpoint.
// while we fsync the log. // the multi operation lock is taken the first time we
toku_multi_operation_client_lock(); // see a non-readonly txn in the recursive commit.
int r = toku_txn_commit(txn, flags, poll, poll_extra, true); // But released in the first-level toku_txn_commit (if taken),
// this way, we don't hold it while we fsync the log.
int r = toku_txn_commit(txn, flags, poll, poll_extra, true, &holds_mo_lock);
return r; return r;
} }
...@@ -277,9 +291,14 @@ static int ...@@ -277,9 +291,14 @@ static int
locked_txn_abort_with_progress(DB_TXN *txn, locked_txn_abort_with_progress(DB_TXN *txn,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
// cannot begin a checkpoint // cannot begin a checkpoint
toku_multi_operation_client_lock(); // the multi operation lock is taken the first time we
int r = toku_txn_abort(txn, poll, poll_extra); // see a non-readonly txn in the abort (or recursive commit).
// But released here so we don't have to hold additional state.
bool holds_mo_lock = false;
int r = toku_txn_abort(txn, poll, poll_extra, &holds_mo_lock);
if (holds_mo_lock) {
toku_multi_operation_client_unlock(); toku_multi_operation_client_unlock();
}
return r; return r;
} }
...@@ -295,22 +314,21 @@ locked_txn_abort(DB_TXN *txn) { ...@@ -295,22 +314,21 @@ locked_txn_abort(DB_TXN *txn) {
return r; return r;
} }
static int static inline void
locked_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) { txn_func_init(DB_TXN *txn) {
// toku_txn_prepare eventually releases the multi operation lock #define STXN(name) txn->name = locked_txn_ ## name
// before fsyncing the log STXN(abort);
toku_multi_operation_client_lock(); STXN(commit);
int r = toku_txn_prepare(txn, gid); STXN(abort_with_progress);
return r; STXN(commit_with_progress);
} STXN(id);
STXN(txn_stat);
static int #undef STXN
locked_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) { #define SUTXN(name) txn->name = toku_txn_ ## name
// toku_txn_xa_prepare eventually releases the multi operation lock SUTXN(prepare);
// before fsyncing the log SUTXN(xa_prepare);
toku_multi_operation_client_lock(); #undef SUTXN
int r = toku_txn_xa_prepare(txn, xid); txn->id64 = toku_txn_id64;
return r;
} }
int int
...@@ -398,17 +416,7 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) { ...@@ -398,17 +416,7 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
//toku_ydb_notef("parent=%p flags=0x%x\n", stxn, flags); //toku_ydb_notef("parent=%p flags=0x%x\n", stxn, flags);
result->mgrp = env; result->mgrp = env;
#define STXN(name) result->name = locked_txn_ ## name txn_func_init(result);
STXN(abort);
STXN(commit);
STXN(abort_with_progress);
STXN(commit_with_progress);
STXN(id);
STXN(prepare);
STXN(xa_prepare);
STXN(txn_stat);
#undef STXN
result->id64 = toku_txn_id64;
result->parent = stxn; result->parent = stxn;
#if !TOKUDB_NATIVE_H #if !TOKUDB_NATIVE_H
...@@ -477,16 +485,7 @@ void toku_keep_prepared_txn_callback (DB_ENV *env, TOKUTXN tokutxn) { ...@@ -477,16 +485,7 @@ void toku_keep_prepared_txn_callback (DB_ENV *env, TOKUTXN tokutxn) {
memset(eresult, 0, sizeof(*eresult)); memset(eresult, 0, sizeof(*eresult));
DB_TXN *result = &eresult->external_part; DB_TXN *result = &eresult->external_part;
result->mgrp = env; result->mgrp = env;
#define STXN(name) result->name = locked_txn_ ## name txn_func_init(result);
STXN(abort);
STXN(commit);
STXN(abort_with_progress);
STXN(commit_with_progress);
STXN(id);
STXN(prepare);
STXN(txn_stat);
#undef STXN
result->id64 = toku_txn_id64;
result->parent = NULL; result->parent = NULL;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment