Commit c87ae300 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

closes #5554, merge to main

git-svn-id: file:///svn/toku/tokudb@51911 c7de825b-a66e-492c-adef-691d508d4ae1
parent 514a9bbc
......@@ -249,6 +249,7 @@ static void print_defines (void) {
#endif
dodefine_from_track(txn_flags, DB_INHERIT_ISOLATION);
dodefine_from_track(txn_flags, DB_SERIALIZABLE);
dodefine_from_track(txn_flags, DB_TXN_READ_ONLY);
}
/* TOKUDB specific error codes*/
......
......@@ -3905,7 +3905,10 @@ static int
does_txn_read_entry(TXNID id, TOKUTXN context) {
int rval;
TXNID oldest_live_in_snapshot = toku_get_oldest_in_live_root_txn_list(context);
if (id < oldest_live_in_snapshot || id == context->txnid.parent_id64) {
if (oldest_live_in_snapshot == TXNID_NONE && id < context->snapshot_txnid64) {
rval = TOKUDB_ACCEPT;
}
else if (id < oldest_live_in_snapshot || id == context->txnid.parent_id64) {
rval = TOKUDB_ACCEPT;
}
else if (id > context->snapshot_txnid64 || toku_is_txn_in_live_root_txn_list(*context->live_root_txn_list, id)) {
......
......@@ -412,6 +412,7 @@ generate_log_writer (void) {
fprintf(cf, " //txn can be NULL during tests\n");
fprintf(cf, " //never null when not checkpoint.\n");
fprintf(cf, " if (txn && !txn->begin_was_logged) {\n");
fprintf(cf, " invariant(!txn_declared_read_only(txn));\n");
fprintf(cf, " toku_maybe_log_begin_txn_for_write_operation(txn);\n");
fprintf(cf, " }\n");
break;
......@@ -419,6 +420,7 @@ generate_log_writer (void) {
case ASSERT_BEGIN_WAS_LOGGED: {
fprintf(cf, " //txn can be NULL during tests\n");
fprintf(cf, " invariant(!txn || txn->begin_was_logged);\n");
fprintf(cf, " invariant(!txn || !txn_declared_read_only(txn));\n");
break;
}
case IGNORE_LOG_BEGIN: break;
......
......@@ -480,7 +480,16 @@ recover_transaction(TOKUTXN *txnp, TXNID_PAIR xid, TXNID_PAIR parentxid, TOKULOG
toku_txnid2txn(logger, xid, &txn);
assert(txn==NULL);
}
r = toku_txn_begin_with_xid(parent, &txn, logger, xid, TXN_SNAPSHOT_NONE, NULL, true);
r = toku_txn_begin_with_xid(
parent,
&txn,
logger,
xid,
TXN_SNAPSHOT_NONE,
NULL,
true, // for_recovery
false // read_only
);
assert(r == 0);
// We only know about it because it was logged. Restore the log bit.
// Logging is 'off' but it will still set the bit.
......
......@@ -34,7 +34,7 @@ static void test_it (int N) {
r = toku_logger_open_rollback(logger, ct, true); CKERR(r);
TOKUTXN txn;
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT, false); CKERR(r);
r = toku_open_ft_handle(FILENAME, 1, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
......@@ -46,12 +46,12 @@ static void test_it (int N) {
unsigned int rands[N];
for (int i=0; i<N; i++) {
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT, false); CKERR(r);
r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
r = toku_txn_commit_txn(txn, false, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT, false); CKERR(r);
char key[100],val[300];
DBT k, v;
rands[i] = random();
......@@ -69,12 +69,12 @@ static void test_it (int N) {
if (verbose) printf("i=%d\n", i);
}
for (int i=0; i<N; i++) {
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT, false); CKERR(r);
r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
r = toku_txn_commit_txn(txn, false, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT, false); CKERR(r);
char key[100];
DBT k;
snprintf(key, sizeof(key), "key%x.%x", rands[i], i);
......@@ -94,7 +94,7 @@ static void test_it (int N) {
if (verbose) printf("d=%d\n", i);
}
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT, false); CKERR(r);
r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
r = toku_txn_commit_txn(txn, false, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn);
......
......@@ -50,7 +50,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
assert(error == 0);
TOKUTXN txn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
FT_HANDLE brt = NULL;
......@@ -62,7 +62,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
toku_txn_close_txn(txn);
txn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
// insert keys 0, 1, 2, .. (n-1)
......@@ -120,7 +120,7 @@ test_provdel(const char *logdir, const char *fname, int n) {
assert(error == 0);
TOKUTXN txn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
FT_HANDLE brt = NULL;
......@@ -132,7 +132,7 @@ test_provdel(const char *logdir, const char *fname, int n) {
toku_txn_close_txn(txn);
txn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
// del keys 0, 2, 4, ...
......@@ -145,7 +145,7 @@ test_provdel(const char *logdir, const char *fname, int n) {
}
TOKUTXN cursortxn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &cursortxn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &cursortxn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
LE_CURSOR cursor = NULL;
......
......@@ -54,7 +54,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
assert(error == 0);
TOKUTXN txn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
FT_HANDLE brt = NULL;
......@@ -66,7 +66,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
toku_txn_close_txn(txn);
txn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
// insert keys 0, 1, 2, .. (n-1)
......
......@@ -50,7 +50,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
assert(error == 0);
TOKUTXN txn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
FT_HANDLE brt = NULL;
......@@ -62,7 +62,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
toku_txn_close_txn(txn);
txn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
// insert keys 0, 1, 2, .. (n-1)
......
......@@ -51,7 +51,8 @@ void txn_child_manager_unit_test::run_child_txn_test() {
NULL,
&root_txn,
logger,
TXN_SNAPSHOT_CHILD
TXN_SNAPSHOT_CHILD,
false
);
CKERR(r);
// test starting a child txn
......@@ -61,7 +62,8 @@ void txn_child_manager_unit_test::run_child_txn_test() {
root_txn,
&child_txn,
logger,
TXN_SNAPSHOT_CHILD
TXN_SNAPSHOT_CHILD,
false
);
CKERR(r);
......@@ -93,7 +95,8 @@ void txn_child_manager_unit_test::run_test() {
NULL,
&root_txn,
logger,
TXN_SNAPSHOT_ROOT
TXN_SNAPSHOT_ROOT,
false
);
CKERR(r);
txn_child_manager* cm = root_txn->child_manager;
......@@ -112,7 +115,8 @@ void txn_child_manager_unit_test::run_test() {
root_txn,
&child_txn,
logger,
TXN_SNAPSHOT_ROOT
TXN_SNAPSHOT_ROOT,
false
);
CKERR(r);
assert(child_txn->child_manager == cm);
......@@ -132,7 +136,8 @@ void txn_child_manager_unit_test::run_test() {
child_txn,
&grandchild_txn,
logger,
TXN_SNAPSHOT_ROOT
TXN_SNAPSHOT_ROOT,
false
);
CKERR(r);
assert(grandchild_txn->child_manager == cm);
......@@ -157,7 +162,8 @@ void txn_child_manager_unit_test::run_test() {
child_txn,
&grandchild_txn,
logger,
TXN_SNAPSHOT_ROOT
TXN_SNAPSHOT_ROOT,
false
);
CKERR(r);
assert(grandchild_txn->child_manager == cm);
......@@ -181,7 +187,8 @@ void txn_child_manager_unit_test::run_test() {
xid,
TXN_SNAPSHOT_NONE,
NULL,
true // for recovery
true, // for recovery
false // read_only
);
assert(recovery_txn->child_manager == cm);
......
......@@ -18,7 +18,7 @@
static void do_txn(TOKULOGGER logger, bool readonly) {
int r;
TOKUTXN txn;
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_NONE);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_NONE, false);
CKERR(r);
if (!readonly) {
......@@ -40,7 +40,7 @@ static void test_xid_lsn_independent(int N) {
int r;
TOKUTXN txn;
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_NONE);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_NONE, false);
CKERR(r);
r = toku_open_ft_handle(FILENAME, 1, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun);
......@@ -50,7 +50,7 @@ static void test_xid_lsn_independent(int N) {
CKERR(r);
toku_txn_close_txn(txn);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_NONE);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_NONE, false);
CKERR(r);
TXNID xid_first = txn->txnid.parent_id64;
unsigned int rands[N];
......@@ -65,7 +65,7 @@ static void test_xid_lsn_independent(int N) {
}
{
TOKUTXN txn2;
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn2, logger, TXN_SNAPSHOT_NONE);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn2, logger, TXN_SNAPSHOT_NONE, false);
CKERR(r);
// Verify the txnid has gone up only by one (even though many log entries were done)
invariant(txn2->txnid.parent_id64 == xid_first + 1);
......@@ -80,7 +80,7 @@ static void test_xid_lsn_independent(int N) {
//TODO(yoni) #5067 will break this portion of the test. (End ids are also assigned, so it would increase by 4 instead of 2.)
// Verify the txnid has gone up only by two (even though many log entries were done)
TOKUTXN txn3;
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn3, logger, TXN_SNAPSHOT_NONE);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn3, logger, TXN_SNAPSHOT_NONE, false);
CKERR(r);
invariant(txn3->txnid.parent_id64 == xid_first + 2);
r = toku_txn_commit_txn(txn3, false, NULL, NULL);
......@@ -176,7 +176,7 @@ static void test_xid_lsn_independent_parents(int N) {
ZERO_ARRAY(txns_hack);
for (int i = 0; i < N; i++) {
r = toku_txn_begin_txn((DB_TXN*)NULL, txns[i-1], &txns[i], logger, TXN_SNAPSHOT_NONE);
r = toku_txn_begin_txn((DB_TXN*)NULL, txns[i-1], &txns[i], logger, TXN_SNAPSHOT_NONE, false);
CKERR(r);
if (i < num_non_cascade) {
......
......@@ -77,19 +77,49 @@ toku_txn_get_root_id(TOKUTXN txn)
return txn->txnid.parent_id64;
}
bool txn_declared_read_only(TOKUTXN txn) {
return (txn->txnid.parent_id64 == TXNID_NONE);
}
int
toku_txn_begin_txn (
DB_TXN *container_db_txn,
TOKUTXN parent_tokutxn,
TOKUTXN *tokutxn,
TOKULOGGER logger,
TXN_SNAPSHOT_TYPE snapshot_type
TXN_SNAPSHOT_TYPE snapshot_type,
bool read_only
)
{
int r = toku_txn_begin_with_xid(parent_tokutxn, tokutxn, logger, TXNID_PAIR_NONE, snapshot_type, container_db_txn, false);
int r = toku_txn_begin_with_xid(
parent_tokutxn,
tokutxn,
logger,
TXNID_PAIR_NONE,
snapshot_type,
container_db_txn,
false, // for_recovery
read_only
);
return r;
}
static void
txn_create_xids(TOKUTXN txn, TOKUTXN parent) {
XIDS xids;
XIDS parent_xids;
if (parent == NULL) {
parent_xids = xids_get_root_xids();
} else {
parent_xids = parent->xids;
}
xids_create_unknown_child(parent_xids, &xids);
TXNID finalized_xid = (parent == NULL) ? txn->txnid.parent_id64 : txn->txnid.child_id64;
xids_finalize_with_child(xids, finalized_xid);
txn->xids = xids;
}
int
toku_txn_begin_with_xid (
TOKUTXN parent,
......@@ -98,24 +128,22 @@ toku_txn_begin_with_xid (
TXNID_PAIR xid,
TXN_SNAPSHOT_TYPE snapshot_type,
DB_TXN *container_db_txn,
bool for_recovery
bool for_recovery,
bool read_only
)
{
int r = 0;
TOKUTXN txn;
XIDS xids;
// Do as much (safe) work as possible before serializing on the txn_manager lock.
XIDS parent_xids;
if (parent == NULL) {
parent_xids = xids_get_root_xids();
} else {
parent_xids = parent->xids;
TOKUTXN txn;
// check for case where we are trying to
// create too many nested transactions
if (!read_only && parent && !xids_can_create_child(parent->xids)) {
r = EINVAL;
goto exit;
}
r = xids_create_unknown_child(parent_xids, &xids);
if (r != 0) {
return r;
if (read_only && parent) {
invariant(txn_declared_read_only(parent));
}
toku_txn_create_txn(&txn, parent, logger, snapshot_type, container_db_txn, xids, for_recovery);
toku_txn_create_txn(&txn, parent, logger, snapshot_type, container_db_txn, for_recovery);
// txnid64, snapshot_txnid64
// will be set in here.
if (for_recovery) {
......@@ -139,7 +167,8 @@ toku_txn_begin_with_xid (
toku_txn_manager_start_txn(
txn,
logger->txn_manager,
snapshot_type
snapshot_type,
read_only
);
}
else {
......@@ -152,10 +181,12 @@ toku_txn_begin_with_xid (
);
}
}
TXNID finalized_xid = (parent == NULL) ? txn->txnid.parent_id64 : txn->txnid.child_id64;
xids_finalize_with_child(txn->xids, finalized_xid);
if (!read_only) {
// this call will set txn->xids
txn_create_xids(txn, parent);
}
*txnp = txn;
exit:
return r;
}
......@@ -180,7 +211,6 @@ void toku_txn_create_txn (
TOKULOGGER logger,
TXN_SNAPSHOT_TYPE snapshot_type,
DB_TXN *container_db_txn,
XIDS xids,
bool for_recovery
)
{
......@@ -216,7 +246,7 @@ static txn_child_manager tcm;
.child_manager = NULL,
.container_db_txn = container_db_txn,
.live_root_txn_list = nullptr,
.xids = xids,
.xids = NULL,
.oldest_referenced_xid = TXNID_NONE,
.begin_was_logged = false,
.do_fsync = false,
......@@ -540,7 +570,9 @@ void toku_txn_complete_txn(TOKUTXN txn) {
void toku_txn_destroy_txn(TOKUTXN txn) {
txn->open_fts.destroy();
xids_destroy(&txn->xids);
if (txn->xids) {
xids_destroy(&txn->xids);
}
toku_mutex_destroy(&txn->txn_lock);
toku_mutex_destroy(&txn->state_lock);
toku_cond_destroy(&txn->state_cond);
......@@ -557,10 +589,14 @@ void toku_txn_force_fsync_on_commit(TOKUTXN txn) {
}
TXNID toku_get_oldest_in_live_root_txn_list(TOKUTXN txn) {
invariant(txn->live_root_txn_list->size()>0);
TXNID xid;
int r = txn->live_root_txn_list->fetch(0, &xid);
assert_zero(r);
if (txn->live_root_txn_list->size()>0) {
int r = txn->live_root_txn_list->fetch(0, &xid);
assert_zero(r);
}
else {
xid = TXNID_NONE;
}
return xid;
}
......
......@@ -29,13 +29,15 @@ void toku_txn_lock(TOKUTXN txn);
void toku_txn_unlock(TOKUTXN txn);
uint64_t toku_txn_get_root_id(TOKUTXN txn);
bool txn_declared_read_only(TOKUTXN txn);
int toku_txn_begin_txn (
DB_TXN *container_db_txn,
TOKUTXN parent_tokutxn,
TOKUTXN *tokutxn,
TOKULOGGER logger,
TXN_SNAPSHOT_TYPE snapshot_type
TXN_SNAPSHOT_TYPE snapshot_type,
bool read_only
);
DB_TXN * toku_txn_get_container_db_txn (TOKUTXN tokutxn);
......@@ -49,11 +51,12 @@ int toku_txn_begin_with_xid (
TXNID_PAIR xid,
TXN_SNAPSHOT_TYPE snapshot_type,
DB_TXN *container_db_txn,
bool for_recovery
bool for_recovery,
bool read_only
);
// Allocate and initialize a txn
void toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXN_SNAPSHOT_TYPE snapshot_type, DB_TXN *container_db_txn, XIDS xids, bool for_checkpoint);
void toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXN_SNAPSHOT_TYPE snapshot_type, DB_TXN *container_db_txn, bool for_checkpoint);
void toku_txn_update_xids_in_txn(TOKUTXN txn, TXNID xid);
int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info);
......
......@@ -492,13 +492,18 @@ void toku_txn_manager_start_txn_for_recovery(
void toku_txn_manager_start_txn(
TOKUTXN txn,
TXN_MANAGER txn_manager,
TXN_SNAPSHOT_TYPE snapshot_type
TXN_SNAPSHOT_TYPE snapshot_type,
bool read_only
)
{
int r;
TXNID xid = TXNID_NONE;
// if we are running in recovery, we don't need to make snapshots
bool needs_snapshot = txn_needs_snapshot(snapshot_type, NULL);
if (read_only && !needs_snapshot) {
inherit_snapshot_from_parent(txn);
goto exit;
}
// perform a malloc outside of the txn_manager lock
// will be used in txn_manager_create_snapshot_unlocked below
......@@ -528,14 +533,16 @@ void toku_txn_manager_start_txn(
// is taken into account when the transaction is closed.
// add ancestor information, and maintain global live root txn list
xid = ++txn_manager->last_xid;
toku_txn_update_xids_in_txn(txn, xid);
uint32_t idx = txn_manager->live_root_txns.size();
r = txn_manager->live_root_txns.insert_at(txn, idx);
invariant_zero(r);
r = txn_manager->live_root_ids.insert_at(txn->txnid.parent_id64, idx);
invariant_zero(r);
txn->oldest_referenced_xid = get_oldest_referenced_xid_unlocked(txn_manager);
if (!read_only) {
xid = ++txn_manager->last_xid;
toku_txn_update_xids_in_txn(txn, xid);
uint32_t idx = txn_manager->live_root_txns.size();
r = txn_manager->live_root_txns.insert_at(txn, idx);
invariant_zero(r);
r = txn_manager->live_root_ids.insert_at(txn->txnid.parent_id64, idx);
invariant_zero(r);
txn->oldest_referenced_xid = get_oldest_referenced_xid_unlocked(txn_manager);
}
if (needs_snapshot) {
txn_manager_create_snapshot_unlocked(
......@@ -548,6 +555,8 @@ void toku_txn_manager_start_txn(
verify_snapshot_system(txn_manager);
}
txn_manager_unlock(txn_manager);
exit:
return;
}
TXNID
......@@ -578,6 +587,9 @@ void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
int r;
invariant(txn->parent == NULL);
bool is_snapshot = txn_needs_snapshot(txn->snapshot_type, NULL);
if (!is_snapshot && txn_declared_read_only(txn)) {
goto exit;
}
txn_manager_lock(txn_manager);
if (garbage_collection_debug) {
......@@ -593,37 +605,39 @@ void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
);
}
uint32_t idx;
//Remove txn from list of live root txns
TOKUTXN txnagain;
r = txn_manager->live_root_txns.find_zero<TOKUTXN, find_xid>(txn, &txnagain, &idx);
invariant_zero(r);
invariant(txn==txnagain);
if (!txn_declared_read_only(txn)) {
uint32_t idx;
//Remove txn from list of live root txns
TOKUTXN txnagain;
r = txn_manager->live_root_txns.find_zero<TOKUTXN, find_xid>(txn, &txnagain, &idx);
invariant_zero(r);
invariant(txn==txnagain);
r = txn_manager->live_root_txns.delete_at(idx);
invariant_zero(r);
r = txn_manager->live_root_ids.delete_at(idx);
invariant_zero(r);
r = txn_manager->live_root_txns.delete_at(idx);
invariant_zero(r);
r = txn_manager->live_root_ids.delete_at(idx);
invariant_zero(r);
if (!toku_txn_is_read_only(txn) || garbage_collection_debug) {
if (!is_snapshot) {
//
// If it's a snapshot, we already calculated index_in_snapshot_txnids.
// Otherwise, calculate it now.
//
r = txn_manager->snapshot_txnids.find_zero<TXNID, toku_find_xid_by_xid>(txn->txnid.parent_id64, nullptr, &index_in_snapshot_txnids);
invariant(r == DB_NOTFOUND);
}
uint32_t num_references = txn_manager->snapshot_txnids.size() - index_in_snapshot_txnids;
if (num_references > 0) {
// This transaction exists in a live list of another transaction.
struct referenced_xid_tuple tuple = {
.begin_id = txn->txnid.parent_id64,
.end_id = ++txn_manager->last_xid,
.references = num_references
};
r = txn_manager->referenced_xids.insert<TXNID, find_tuple_by_xid>(tuple, txn->txnid.parent_id64, nullptr);
lazy_assert_zero(r);
if (!toku_txn_is_read_only(txn) || garbage_collection_debug) {
if (!is_snapshot) {
//
// If it's a snapshot, we already calculated index_in_snapshot_txnids.
// Otherwise, calculate it now.
//
r = txn_manager->snapshot_txnids.find_zero<TXNID, toku_find_xid_by_xid>(txn->txnid.parent_id64, nullptr, &index_in_snapshot_txnids);
invariant(r == DB_NOTFOUND);
}
uint32_t num_references = txn_manager->snapshot_txnids.size() - index_in_snapshot_txnids;
if (num_references > 0) {
// This transaction exists in a live list of another transaction.
struct referenced_xid_tuple tuple = {
.begin_id = txn->txnid.parent_id64,
.end_id = ++txn_manager->last_xid,
.references = num_references
};
r = txn_manager->referenced_xids.insert<TXNID, find_tuple_by_xid>(tuple, txn->txnid.parent_id64, nullptr);
lazy_assert_zero(r);
}
}
}
......@@ -638,6 +652,8 @@ void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
txn->live_root_txn_list->destroy();
toku_free(txn->live_root_txn_list);
}
exit:
return;
}
void toku_txn_manager_clone_state_for_gc(
......
......@@ -58,7 +58,8 @@ void toku_txn_manager_handle_snapshot_destroy_for_child_txn(
void toku_txn_manager_start_txn(
TOKUTXN txn,
TXN_MANAGER txn_manager,
TXN_SNAPSHOT_TYPE snapshot_type
TXN_SNAPSHOT_TYPE snapshot_type,
bool read_only
);
void toku_txn_manager_start_txn_for_recovery(
......
......@@ -62,6 +62,12 @@ xids_get_root_xids(void) {
return rval;
}
bool
xids_can_create_child(XIDS xids) {
invariant(xids->num_xids < MAX_TRANSACTION_RECORDS);
return (xids->num_xids + 1) != MAX_TRANSACTION_RECORDS;
}
int
xids_create_unknown_child(XIDS parent_xids, XIDS *xids_p) {
......@@ -70,17 +76,15 @@ xids_create_unknown_child(XIDS parent_xids, XIDS *xids_p) {
int rval;
invariant(parent_xids);
uint32_t num_child_xids = parent_xids->num_xids + 1;
invariant(num_child_xids > 0);
invariant(num_child_xids <= MAX_TRANSACTION_RECORDS);
if (num_child_xids == MAX_TRANSACTION_RECORDS) rval = EINVAL;
else {
size_t new_size = sizeof(*parent_xids) + num_child_xids*sizeof(parent_xids->ids[0]);
XIDS CAST_FROM_VOIDP(xids, toku_xmalloc(new_size));
// Clone everything (parent does not have the newest xid).
memcpy(xids, parent_xids, new_size - sizeof(xids->ids[0]));
*xids_p = xids;
rval = 0;
}
// assumes that caller has verified that num_child_xids will
// be less than MAX_TRANSACTIN_RECORDS
invariant(num_child_xids < MAX_TRANSACTION_RECORDS);
size_t new_size = sizeof(*parent_xids) + num_child_xids*sizeof(parent_xids->ids[0]);
XIDS CAST_FROM_VOIDP(xids, toku_xmalloc(new_size));
// Clone everything (parent does not have the newest xid).
memcpy(xids, parent_xids, new_size - sizeof(xids->ids[0]));
*xids_p = xids;
rval = 0;
return rval;
}
......@@ -99,11 +103,13 @@ int
xids_create_child(XIDS parent_xids, // xids list for parent transaction
XIDS * xids_p, // xids list created
TXNID this_xid) { // xid of this transaction (new innermost)
int rval = xids_create_unknown_child(parent_xids, xids_p);
if (rval == 0) {
xids_finalize_with_child(*xids_p, this_xid);
bool can_create_child = xids_can_create_child(parent_xids);
if (!can_create_child) {
return EINVAL;
}
return rval;
xids_create_unknown_child(parent_xids, xids_p);
xids_finalize_with_child(*xids_p, this_xid);
return 0;
}
void
......
......@@ -28,6 +28,8 @@
//Retrieve an XIDS representing the root transaction.
XIDS xids_get_root_xids(void);
bool xids_can_create_child(XIDS xids);
void xids_cpy(XIDS target, XIDS source);
//Creates an XIDS representing this transaction.
......
......@@ -161,6 +161,7 @@ toku_indexer_create_indexer(DB_ENV *env,
{
int rval;
DB_INDEXER *indexer = 0; // set later when created
HANDLE_READ_ONLY_TXN(txn);
*indexerp = NULL;
......
......@@ -169,6 +169,7 @@ toku_loader_create_loader(DB_ENV *env,
uint32_t loader_flags,
bool check_empty) {
int rval;
HANDLE_READ_ONLY_TXN(txn);
*blp = NULL; // set later when created
......
......@@ -24,7 +24,7 @@
static int create_child_txn(DB_TXN* txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
DB_TXN* child_txn = NULL;
DB_ENV* env = arg->env;
int r = env->txn_begin(env, txn, &child_txn, arg->txn_type);
int r = env->txn_begin(env, txn, &child_txn, arg->txn_flags);
CKERR(r);
r = child_txn->commit(child_txn, 0);
CKERR(r);
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#ident "$Id: perf_nop.cc 45903 2012-07-19 13:06:39Z leifwalsh $"
#include "test.h"
#include <stdio.h>
#include <stdlib.h>
#include <toku_pthread.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>
#include "threaded_stress_test_helpers.h"
// The intent of this test is to measure the throughput of creating and destroying
// root read-only transactions that create snapshots
static int UU() nop(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
return 0;
}
static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args);
myargs[i].txn_flags |= DB_TXN_READ_ONLY;
myargs[i].operation = nop;
}
run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
}
int
test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args);
args.single_txn = false;
args.num_elements = 0;
args.num_DBs = 0;
args.num_put_threads = 0;
args.num_update_threads = 0;
stress_test_main(&args);
return 0;
}
......@@ -31,7 +31,7 @@ static int commit_and_create_txn(
int rand_txn_id = random() % num_txns;
int r = txns[rand_txn_id]->commit(txns[rand_txn_id], 0);
CKERR(r);
r = arg->env->txn_begin(arg->env, 0, &txns[rand_txn_id], arg->txn_type);
r = arg->env->txn_begin(arg->env, 0, &txns[rand_txn_id], arg->txn_flags);
CKERR(r);
return 0;
}
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: test_get_max_row_size.cc 45903 2012-07-19 13:06:39Z leifwalsh $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "test.h"
int test_main(int argc, char * const argv[])
{
int r;
DB * db;
DB_ENV * env;
(void) argc;
(void) argv;
char buf[200];
snprintf(buf, 200, "rm -rf " ENVDIR);
r = system(buf); { int chk_r = r; CKERR(chk_r); }
r = toku_os_mkdir(ENVDIR, 0755); { int chk_r = r; CKERR(chk_r); }
// set things up
r = db_env_create(&env, 0);
CKERR(r);
r = env->open(env, ENVDIR, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, 0755);
CKERR(r);
r = db_create(&db, env, 0);
CKERR(r);
r = db->open(db, NULL, "foo.db", NULL, DB_BTREE, DB_CREATE, 0644);
CKERR(r);
DB_TXN* txn = NULL;
r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT);
CKERR(r);
int k = 1;
int v = 10;
DBT key, val;
r = db->put(
db,
txn,
dbt_init(&key, &k, sizeof k),
dbt_init(&val, &v, sizeof v),
0
);
CKERR(r);
k = 2;
v = 20;
r = db->put(
db,
txn,
dbt_init(&key, &k, sizeof k),
dbt_init(&val, &v, sizeof v),
0
);
CKERR(r);
r = txn->commit(txn, 0);
CKERR(r);
r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT | DB_TXN_READ_ONLY);
CKERR(r);
DBC* cursor = NULL;
r = db->cursor(db, txn, &cursor, 0);
CKERR(r);
DBT key1, val1;
memset(&key1, 0, sizeof key1);
memset(&val1, 0, sizeof val1);
r = cursor->c_get(cursor, &key1, &val1, DB_FIRST);
CKERR(r);
invariant(key1.size == sizeof(int));
invariant(*(int *)key1.data == 1);
invariant(val1.size == sizeof(int));
invariant(*(int *)val1.data == 10);
r = cursor->c_get(cursor, &key1, &val1, DB_NEXT);
CKERR(r);
invariant(key1.size == sizeof(int));
invariant(*(int *)key1.data == 2);
invariant(val1.size == sizeof(int));
invariant(*(int *)val1.data == 20);
r = cursor->c_close(cursor);
CKERR(r);
r = txn->commit(txn, 0);
CKERR(r);
// clean things up
r = db->close(db, 0);
CKERR(r);
r = env->close(env, 0);
CKERR(r);
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: test_get_max_row_size.cc 45903 2012-07-19 13:06:39Z leifwalsh $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "test.h"
static int update_fun(DB *UU(db),
const DBT *UU(key),
const DBT *UU(old_val), const DBT *UU(extra),
void (*set_val)(const DBT *new_val,
void *set_extra),
void *UU(set_extra))
{
abort();
assert(set_val != NULL);
return 0;
}
static int generate_row_for_put(
DB *UU(dest_db),
DB *UU(src_db),
DBT *UU(dest_key),
DBT *UU(dest_val),
const DBT *UU(src_key),
const DBT *UU(src_val)
)
{
abort();
return 0;
}
static int generate_row_for_del(
DB *UU(dest_db),
DB *UU(src_db),
DBT *UU(dest_key),
const DBT *UU(src_key),
const DBT *UU(src_val)
)
{
abort();
return 0;
}
static void test_invalid_ops(uint32_t iso_flags) {
int r;
DB * db;
DB_ENV * env;
char buf[200];
snprintf(buf, 200, "rm -rf " ENVDIR);
r = system(buf); { int chk_r = r; CKERR(chk_r); }
r = toku_os_mkdir(ENVDIR, 0755); { int chk_r = r; CKERR(chk_r); }
// set things up
r = db_env_create(&env, 0);
CKERR(r);
r = env->set_generate_row_callback_for_put(env,generate_row_for_put);
CKERR(r);
r = env->set_generate_row_callback_for_del(env,generate_row_for_del);
CKERR(r);
env->set_update(env, update_fun);
r = env->open(env, ENVDIR, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, 0755);
CKERR(r);
r = db_create(&db, env, 0);
CKERR(r);
DB_TXN* txn = NULL;
r = env->txn_begin(env, 0, &txn, iso_flags | DB_TXN_READ_ONLY);
CKERR(r);
r = db->open(db, txn, "foo.db", NULL, DB_BTREE, DB_CREATE, 0644);
CKERR2(r, EINVAL);
r = db->open(db, NULL, "foo.db", NULL, DB_BTREE, DB_CREATE, 0644);
CKERR(r);
int k = 1;
int v = 10;
DBT key, val;
dbt_init(&key, &k, sizeof k);
dbt_init(&val, &v, sizeof v);
uint32_t db_flags = 0;
uint32_t indexer_flags = 0;
DB_INDEXER* indexer;
r = env->create_indexer(
env,
txn,
&indexer,
db,
1,
&db,
&db_flags,
indexer_flags
);
CKERR2(r, EINVAL);
// test invalid operations of ydb_db.cc,
// db->open tested above
DB_LOADER* loader;
uint32_t put_flags = 0;
uint32_t dbt_flags = 0;
r = env->create_loader(env, txn, &loader, NULL, 1, &db, &put_flags, &dbt_flags, 0);
CKERR2(r, EINVAL);
r = db->change_descriptor(db, txn, &key, 0);
CKERR2(r, EINVAL);
//
// test invalid operations return EINVAL from ydb_write.cc
//
r = db->put(db, txn, &key, &val,0);
CKERR2(r, EINVAL);
r = db->del(db, txn, &key, DB_DELETE_ANY);
CKERR2(r, EINVAL);
r = db->update(db, txn, &key, &val, 0);
CKERR2(r, EINVAL);
r = db->update_broadcast(db, txn, &val, 0);
CKERR2(r, EINVAL);
r = env->put_multiple(env, NULL, txn, &key, &val, 1, &db, &key, &val, 0);
CKERR2(r, EINVAL);
r = env->del_multiple(env, NULL, txn, &key, &val, 1, &db, &key, 0);
CKERR2(r, EINVAL);
uint32_t flags;
r = env->update_multiple(
env, NULL, txn,
&key, &val,
&key, &val,
1, &db, &flags,
1, &key,
1, &val
);
CKERR2(r, EINVAL);
r = db->close(db, 0);
CKERR(r);
// test invalid operations of ydb.cc, dbrename and dbremove
r = env->dbremove(env, txn, "foo.db", NULL, 0);
CKERR2(r, EINVAL);
// test invalid operations of ydb.cc, dbrename and dbremove
r = env->dbrename(env, txn, "foo.db", NULL, "bar.db", 0);
CKERR2(r, EINVAL);
r = txn->commit(txn, 0);
CKERR(r);
// clean things up
r = env->close(env, 0);
CKERR(r);
}
int test_main(int argc, char * const argv[]) {
(void) argc;
(void) argv;
test_invalid_ops(0);
test_invalid_ops(DB_TXN_SNAPSHOT);
test_invalid_ops(DB_READ_COMMITTED);
test_invalid_ops(DB_READ_UNCOMMITTED);
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: test_get_max_row_size.cc 45903 2012-07-19 13:06:39Z leifwalsh $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "test.h"
static void test_read_txn_creation(DB_ENV* env, uint32_t iso_flags) {
int r;
DB_TXN* parent_txn = NULL;
DB_TXN* child_txn = NULL;
r = env->txn_begin(env, 0, &parent_txn, iso_flags);
CKERR(r);
r = env->txn_begin(env, parent_txn, &child_txn, iso_flags | DB_TXN_READ_ONLY);
CKERR2(r, EINVAL);
r = env->txn_begin(env, parent_txn, &child_txn, iso_flags);
CKERR(r);
r = child_txn->commit(child_txn, 0);
CKERR(r);
r = parent_txn->commit(parent_txn, 0);
CKERR(r);
r = env->txn_begin(env, 0, &parent_txn, iso_flags | DB_TXN_READ_ONLY);
CKERR(r);
r = env->txn_begin(env, parent_txn, &child_txn, iso_flags | DB_TXN_READ_ONLY);
CKERR(r);
r = child_txn->commit(child_txn, 0);
CKERR(r);
r = env->txn_begin(env, parent_txn, &child_txn, iso_flags);
CKERR(r);
r = child_txn->commit(child_txn, 0);
CKERR(r);
r = parent_txn->commit(parent_txn, 0);
CKERR(r);
}
int test_main(int argc, char * const argv[])
{
int r;
DB_ENV * env;
(void) argc;
(void) argv;
char buf[200];
snprintf(buf, 200, "rm -rf " ENVDIR);
r = system(buf); { int chk_r = r; CKERR(chk_r); }
r = toku_os_mkdir(ENVDIR, 0755); { int chk_r = r; CKERR(chk_r); }
// set things up
r = db_env_create(&env, 0);
CKERR(r);
r = env->open(env, ENVDIR, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, 0755);
CKERR(r);
test_read_txn_creation(env, 0);
test_read_txn_creation(env, DB_TXN_SNAPSHOT);
test_read_txn_creation(env, DB_READ_COMMITTED);
test_read_txn_creation(env, DB_READ_UNCOMMITTED);
r = env->close(env, 0);
CKERR(r);
return 0;
}
......@@ -69,6 +69,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
soe[1].prefetch = false;
myargs[1].operation_extra = &soe[1];
myargs[1].operation = scan_op;
myargs[1].txn_flags = DB_TXN_SNAPSHOT | DB_TXN_READ_ONLY;
// make the backward fast scanner
soe[2].fast = true;
......@@ -76,6 +77,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
soe[2].prefetch = false;
myargs[2].operation_extra = &soe[2];
myargs[2].operation = scan_op;
myargs[2].txn_flags = DB_TXN_SNAPSHOT | DB_TXN_READ_ONLY;
// make the backward slow scanner
soe[3].fast = false;
......
......@@ -63,6 +63,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
soe[1].prefetch = false;
myargs[1].operation_extra = &soe[1];
myargs[1].operation = scan_op;
myargs[1].txn_flags = DB_TXN_SNAPSHOT | DB_TXN_READ_ONLY;
// make the backward fast scanner
soe[2].fast = true;
......@@ -70,6 +71,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
soe[2].prefetch = false;
myargs[2].operation_extra = &soe[2];
myargs[2].operation = scan_op;
myargs[2].txn_flags = DB_TXN_SNAPSHOT | DB_TXN_READ_ONLY;
// make the backward slow scanner
soe[3].fast = false;
......
......@@ -62,6 +62,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
soe[1].prefetch = false;
myargs[1].operation_extra = &soe[1];
myargs[1].operation = scan_op;
myargs[1].txn_flags = DB_TXN_SNAPSHOT | DB_TXN_READ_ONLY;
// make the backward fast scanner
soe[2].fast = true;
......@@ -69,6 +70,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
soe[2].prefetch = false;
myargs[2].operation_extra = &soe[2];
myargs[2].operation = scan_op;
myargs[2].txn_flags = DB_TXN_SNAPSHOT | DB_TXN_READ_ONLY;
// make the backward slow scanner
soe[3].fast = false;
......
......@@ -36,6 +36,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
soe[0].prefetch = false;
myargs[0].operation_extra = &soe[0];
myargs[0].operation = scan_op;
myargs[0].txn_flags = DB_TXN_SNAPSHOT | DB_TXN_READ_ONLY;
// make the forward slow scanner
soe[1].fast = false;
......
......@@ -150,7 +150,7 @@ struct arg {
// DB are in [0, num_elements)
// false otherwise
int sleep_ms; // number of milliseconds to sleep between operations
uint32_t txn_type; // isolation level for txn running operation
uint32_t txn_flags; // isolation level for txn running operation
operation_t operation; // function that is the operation to be run
void* operation_extra; // extra parameter passed to operation
enum stress_lock_type lock_type; // states if operation must be exclusive, shared, or does not require locking
......@@ -169,7 +169,7 @@ static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cl
arg->bounded_element_range = true;
arg->sleep_ms = 0;
arg->lock_type = STRESS_LOCK_NONE;
arg->txn_type = DB_TXN_SNAPSHOT;
arg->txn_flags = DB_TXN_SNAPSHOT;
arg->operation_extra = nullptr;
arg->do_prepare = false;
arg->prelock_updates = false;
......@@ -501,12 +501,12 @@ static void *worker(void *arg_v) {
printf("%lu starting %p\n", (unsigned long) intself, arg->operation);
}
if (arg->cli->single_txn) {
r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r);
r = env->txn_begin(env, 0, &txn, arg->txn_flags); CKERR(r);
}
while (run_test) {
lock_worker_op(we);
if (!arg->cli->single_txn) {
r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r);
r = env->txn_begin(env, 0, &txn, arg->txn_flags); CKERR(r);
}
r = arg->operation(txn, arg, arg->operation_extra, we->counters);
if (r==0 && !arg->cli->single_txn && arg->do_prepare) {
......@@ -2507,7 +2507,7 @@ UU() stress_recover(struct cli_args *args) {
DB_TXN* txn = nullptr;
struct arg recover_args;
arg_init(&recover_args, dbs, env, args);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_type);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_flags);
CKERR(r);
struct scan_op_extra soe = {
.fast = true,
......
......@@ -206,6 +206,16 @@ env_opened(DB_ENV *env) {
return env->i->cachetable != 0;
}
static inline bool
txn_is_read_only(DB_TXN* txn) {
if (txn && (db_txn_struct_i(txn)->flags & DB_TXN_READ_ONLY)) {
return true;
}
return false;
}
#define HANDLE_READ_ONLY_TXN(txn) if(txn_is_read_only(txn)) return EINVAL;
void env_panic(DB_ENV * env, int cause, const char * msg);
void env_note_db_opened(DB_ENV *env, DB *db);
void env_note_db_closed(DB_ENV *env, DB *db);
......
......@@ -1117,6 +1117,7 @@ static int
locked_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags) {
int ret, r;
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
HANDLE_READ_ONLY_TXN(txn);
DB_TXN *child_txn = NULL;
int using_txns = env->i->open_flags & DB_INIT_TXN;
......@@ -1147,6 +1148,7 @@ static int env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char
static int
locked_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
int ret, r;
HANDLE_READ_ONLY_TXN(txn);
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
DB_TXN *child_txn = NULL;
......@@ -2307,6 +2309,7 @@ env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, u
if (!env_opened(env) || flags != 0) {
return EINVAL;
}
HANDLE_READ_ONLY_TXN(txn);
if (dbname != NULL) {
// env_dbremove_subdb() converts (fname, dbname) to dname
return env_dbremove_subdb(env, txn, fname, dbname, flags);
......@@ -2413,6 +2416,7 @@ env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, co
if (!env_opened(env) || flags != 0) {
return EINVAL;
}
HANDLE_READ_ONLY_TXN(txn);
if (dbname != NULL) {
// env_dbrename_subdb() converts (fname, dbname) to dname and (fname, newname) to newdname
return env_dbrename_subdb(env, txn, fname, dbname, newname, flags);
......
......@@ -210,6 +210,7 @@ static uint64_t nontransactional_open_id = 0;
static int
toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, uint32_t flags, int mode) {
HANDLE_PANICKED_DB(db);
HANDLE_READ_ONLY_TXN(txn);
if (dbname != NULL) {
return db_open_subdb(db, txn, fname, dbname, dbtype, flags, mode);
}
......@@ -347,6 +348,7 @@ void toku_db_lt_on_destroy_callback(toku::locktree *lt) {
int
toku_db_open_iname(DB * db, DB_TXN * txn, const char *iname_in_env, uint32_t flags, int mode) {
//Set comparison functions if not yet set.
HANDLE_READ_ONLY_TXN(txn);
if (!db->i->key_compare_was_set && db->dbenv->i->bt_compare) {
toku_ft_set_bt_compare(db->i->ft_handle, db->dbenv->i->bt_compare);
db->i->key_compare_was_set = true;
......@@ -469,6 +471,7 @@ int toku_db_pre_acquire_fileops_lock(DB *db, DB_TXN *txn) {
static int
toku_db_change_descriptor(DB *db, DB_TXN* txn, const DBT* descriptor, uint32_t flags) {
HANDLE_PANICKED_DB(db);
HANDLE_READ_ONLY_TXN(txn);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
int r = 0;
TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
......@@ -695,6 +698,7 @@ autotxn_db_getf_set (DB *db, DB_TXN *txn, uint32_t flags, DBT *key, YDB_CALLBACK
static int
locked_db_open(DB *db, DB_TXN *txn, const char *fname, const char *dbname, DBTYPE dbtype, uint32_t flags, int mode) {
int ret, r;
HANDLE_READ_ONLY_TXN(txn);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
//
......@@ -1024,6 +1028,7 @@ load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[/*N*/], const char * new
int
locked_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[/*N*/], char * new_inames_in_env[/*N*/], LSN *load_lsn, bool mark_as_loader) {
int ret, r;
HANDLE_READ_ONLY_TXN(txn);
DB_TXN *child_txn = NULL;
int using_txns = env->i->open_flags & DB_INIT_TXN;
......
......@@ -322,6 +322,36 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, uint32_t flags) {
uint32_t txn_flags = 0;
txn_flags |= DB_TXN_NOWAIT; //We do not support blocking locks. RFP remove this?
// handle whether txn is declared as read only
bool parent_txn_declared_read_only =
stxn &&
(db_txn_struct_i(stxn)->flags & DB_TXN_READ_ONLY);
bool txn_declared_read_only = false;
if (flags & DB_TXN_READ_ONLY) {
txn_declared_read_only = true;
txn_flags |= DB_TXN_READ_ONLY;
flags &= ~(DB_TXN_READ_ONLY);
}
if (txn_declared_read_only && stxn &&
!parent_txn_declared_read_only
)
{
return toku_ydb_do_error(
env,
EINVAL,
"Current transaction set as read only, but parent transaction is not\n"
);
}
if (parent_txn_declared_read_only)
{
// don't require child transaction to also set transaction as read only
// if parent has already done so
txn_flags |= DB_TXN_READ_ONLY;
txn_declared_read_only = true;
}
TOKU_ISOLATION child_isolation = TOKU_ISO_SERIALIZABLE;
uint32_t iso_flags = flags & DB_ISOLATION_FLAGS;
if (!(iso_flags == 0 ||
......@@ -427,7 +457,8 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, uint32_t flags) {
TXNID_PAIR_NONE,
snapshot_type,
result,
false
false, // for_recovery
txn_declared_read_only // read_only
);
if (r != 0) {
toku_free(result);
......
......@@ -132,6 +132,7 @@ int
toku_db_del(DB *db, DB_TXN *txn, DBT *key, uint32_t flags, bool holds_mo_lock) {
HANDLE_PANICKED_DB(db);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
HANDLE_READ_ONLY_TXN(txn);
uint32_t unchecked_flags = flags;
//DB_DELETE_ANY means delete regardless of whether it exists in the db.
......@@ -175,6 +176,7 @@ int
toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, uint32_t flags, bool holds_mo_lock) {
HANDLE_PANICKED_DB(db);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
HANDLE_READ_ONLY_TXN(txn);
int r = 0;
uint32_t lock_flags = get_prelocked_flags(flags);
......@@ -222,6 +224,7 @@ toku_db_update(DB *db, DB_TXN *txn,
uint32_t flags) {
HANDLE_PANICKED_DB(db);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
HANDLE_READ_ONLY_TXN(txn);
int r = 0;
uint32_t lock_flags = get_prelocked_flags(flags);
......@@ -263,6 +266,7 @@ toku_db_update_broadcast(DB *db, DB_TXN *txn,
uint32_t flags) {
HANDLE_PANICKED_DB(db);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
HANDLE_READ_ONLY_TXN(txn);
int r = 0;
uint32_t lock_flags = get_prelocked_flags(flags);
......@@ -428,6 +432,7 @@ env_del_multiple(
DB_INDEXER* indexer = NULL;
HANDLE_PANICKED_ENV(env);
HANDLE_READ_ONLY_TXN(txn);
uint32_t lock_flags[num_dbs];
uint32_t remaining_flags[num_dbs];
......@@ -574,6 +579,7 @@ env_put_multiple_internal(
DB_INDEXER* indexer = NULL;
HANDLE_PANICKED_ENV(env);
HANDLE_READ_ONLY_TXN(txn);
uint32_t lock_flags[num_dbs];
uint32_t remaining_flags[num_dbs];
......@@ -674,6 +680,7 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
HANDLE_PANICKED_ENV(env);
DB_INDEXER* indexer = NULL;
HANDLE_READ_ONLY_TXN(txn);
if (!txn) {
r = EINVAL;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment