Commit 2f446f25 authored by Sergei Golubchik's avatar Sergei Golubchik

Merge commit 'tokudb-ft-index/tokudb-7.5.6' into 5.5

parents 939a2334 f3dcb78e
...@@ -572,7 +572,7 @@ static void print_db_txn_struct (void) { ...@@ -572,7 +572,7 @@ static void print_db_txn_struct (void) {
STRUCT_SETUP(DB_TXN, abort, "int (*%s) (DB_TXN *)"); STRUCT_SETUP(DB_TXN, abort, "int (*%s) (DB_TXN *)");
STRUCT_SETUP(DB_TXN, api_internal,"void *%s"); STRUCT_SETUP(DB_TXN, api_internal,"void *%s");
STRUCT_SETUP(DB_TXN, commit, "int (*%s) (DB_TXN*, uint32_t)"); STRUCT_SETUP(DB_TXN, commit, "int (*%s) (DB_TXN*, uint32_t)");
STRUCT_SETUP(DB_TXN, prepare, "int (*%s) (DB_TXN*, uint8_t gid[DB_GID_SIZE])"); STRUCT_SETUP(DB_TXN, prepare, "int (*%s) (DB_TXN*, uint8_t gid[DB_GID_SIZE], uint32_t flags)");
STRUCT_SETUP(DB_TXN, discard, "int (*%s) (DB_TXN*, uint32_t)"); STRUCT_SETUP(DB_TXN, discard, "int (*%s) (DB_TXN*, uint32_t)");
STRUCT_SETUP(DB_TXN, id, "uint32_t (*%s) (DB_TXN *)"); STRUCT_SETUP(DB_TXN, id, "uint32_t (*%s) (DB_TXN *)");
STRUCT_SETUP(DB_TXN, mgrp, "DB_ENV *%s /* In TokuFT, mgrp is a DB_ENV, not a DB_TXNMGR */"); STRUCT_SETUP(DB_TXN, mgrp, "DB_ENV *%s /* In TokuFT, mgrp is a DB_ENV, not a DB_TXNMGR */");
...@@ -581,11 +581,12 @@ static void print_db_txn_struct (void) { ...@@ -581,11 +581,12 @@ static void print_db_txn_struct (void) {
"int (*txn_stat)(DB_TXN *, struct txn_stat **)", "int (*txn_stat)(DB_TXN *, struct txn_stat **)",
"int (*commit_with_progress)(DB_TXN*, uint32_t, TXN_PROGRESS_POLL_FUNCTION, void*)", "int (*commit_with_progress)(DB_TXN*, uint32_t, TXN_PROGRESS_POLL_FUNCTION, void*)",
"int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*)", "int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*)",
"int (*xa_prepare) (DB_TXN*, TOKU_XA_XID *)", "int (*xa_prepare) (DB_TXN*, TOKU_XA_XID *, uint32_t flags)",
"uint64_t (*id64) (DB_TXN*)", "uint64_t (*id64) (DB_TXN*)",
"void (*set_client_id)(DB_TXN *, uint64_t client_id)", "void (*set_client_id)(DB_TXN *, uint64_t client_id)",
"uint64_t (*get_client_id)(DB_TXN *)", "uint64_t (*get_client_id)(DB_TXN *)",
"bool (*is_prepared)(DB_TXN *)", "bool (*is_prepared)(DB_TXN *)",
"DB_TXN *(*get_child)(DB_TXN *)",
NULL}; NULL};
sort_and_dump_fields("db_txn", false, extra); sort_and_dump_fields("db_txn", false, extra);
} }
...@@ -614,7 +615,7 @@ static void print_dbc_struct (void) { ...@@ -614,7 +615,7 @@ static void print_dbc_struct (void) {
"int (*c_getf_set_range_reverse)(DBC *, uint32_t, DBT *, YDB_CALLBACK_FUNCTION, void *)", "int (*c_getf_set_range_reverse)(DBC *, uint32_t, DBT *, YDB_CALLBACK_FUNCTION, void *)",
"int (*c_getf_set_range_with_bound)(DBC *, uint32_t, DBT *k, DBT *k_bound, YDB_CALLBACK_FUNCTION, void *)", "int (*c_getf_set_range_with_bound)(DBC *, uint32_t, DBT *k, DBT *k_bound, YDB_CALLBACK_FUNCTION, void *)",
"int (*c_set_bounds)(DBC*, const DBT*, const DBT*, bool pre_acquire, int out_of_range_error)", "int (*c_set_bounds)(DBC*, const DBT*, const DBT*, bool pre_acquire, int out_of_range_error)",
"void (*c_set_check_interrupt_callback)(DBC*, bool (*)(void*), void *)", "void (*c_set_check_interrupt_callback)(DBC*, bool (*)(void*, uint64_t deleted_rows), void *)",
"void (*c_remove_restriction)(DBC*)", "void (*c_remove_restriction)(DBC*)",
"char _internal[512]", "char _internal[512]",
NULL}; NULL};
......
...@@ -655,7 +655,7 @@ int toku_upgrade_msn_from_root_to_header(int fd, FT ft) __attribute__((nonnull)) ...@@ -655,7 +655,7 @@ int toku_upgrade_msn_from_root_to_header(int fd, FT ft) __attribute__((nonnull))
// When lock_only is true, the callback only does optional lock tree locking. // When lock_only is true, the callback only does optional lock tree locking.
typedef int (*FT_GET_CALLBACK_FUNCTION)(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only); typedef int (*FT_GET_CALLBACK_FUNCTION)(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only);
typedef bool (*FT_CHECK_INTERRUPT_CALLBACK)(void *extra); typedef bool (*FT_CHECK_INTERRUPT_CALLBACK)(void *extra, uint64_t deleted_rows);
struct ft_cursor; struct ft_cursor;
int toku_ft_search(FT_HANDLE ft_handle, ft_search *search, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, struct ft_cursor *ftcursor, bool can_bulk_fetch); int toku_ft_search(FT_HANDLE ft_handle, ft_search *search, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, struct ft_cursor *ftcursor, bool can_bulk_fetch);
...@@ -3387,7 +3387,7 @@ ok: ; ...@@ -3387,7 +3387,7 @@ ok: ;
idx++; idx++;
if (idx >= bn->data_buffer.num_klpairs() || ((n_deleted % 64) == 0 && !search_continue(search, key, keylen))) { if (idx >= bn->data_buffer.num_klpairs() || ((n_deleted % 64) == 0 && !search_continue(search, key, keylen))) {
STATUS_INC(FT_CURSOR_SKIP_DELETED_LEAF_ENTRY, n_deleted); STATUS_INC(FT_CURSOR_SKIP_DELETED_LEAF_ENTRY, n_deleted);
if (ftcursor->interrupt_cb && ftcursor->interrupt_cb(ftcursor->interrupt_cb_extra)) { if (ftcursor->interrupt_cb && ftcursor->interrupt_cb(ftcursor->interrupt_cb_extra, n_deleted)) {
return TOKUDB_INTERRUPTED; return TOKUDB_INTERRUPTED;
} }
return DB_NOTFOUND; return DB_NOTFOUND;
...@@ -3396,7 +3396,7 @@ ok: ; ...@@ -3396,7 +3396,7 @@ ok: ;
case FT_SEARCH_RIGHT: case FT_SEARCH_RIGHT:
if (idx == 0) { if (idx == 0) {
STATUS_INC(FT_CURSOR_SKIP_DELETED_LEAF_ENTRY, n_deleted); STATUS_INC(FT_CURSOR_SKIP_DELETED_LEAF_ENTRY, n_deleted);
if (ftcursor->interrupt_cb && ftcursor->interrupt_cb(ftcursor->interrupt_cb_extra)) { if (ftcursor->interrupt_cb && ftcursor->interrupt_cb(ftcursor->interrupt_cb_extra, n_deleted)) {
return TOKUDB_INTERRUPTED; return TOKUDB_INTERRUPTED;
} }
return DB_NOTFOUND; return DB_NOTFOUND;
...@@ -3410,6 +3410,8 @@ ok: ; ...@@ -3410,6 +3410,8 @@ ok: ;
assert_zero(r); // we just validated the index assert_zero(r); // we just validated the index
if (!le_val_is_del(le, ftcursor->is_snapshot_read, ftcursor->ttxn)) { if (!le_val_is_del(le, ftcursor->is_snapshot_read, ftcursor->ttxn)) {
STATUS_INC(FT_CURSOR_SKIP_DELETED_LEAF_ENTRY, n_deleted); STATUS_INC(FT_CURSOR_SKIP_DELETED_LEAF_ENTRY, n_deleted);
if (ftcursor->interrupt_cb)
ftcursor->interrupt_cb(ftcursor->interrupt_cb_extra, n_deleted);
goto got_a_good_value; goto got_a_good_value;
} }
} }
......
...@@ -1064,6 +1064,11 @@ garbage_helper(BLOCKNUM blocknum, int64_t UU(size), int64_t UU(address), void *e ...@@ -1064,6 +1064,11 @@ garbage_helper(BLOCKNUM blocknum, int64_t UU(size), int64_t UU(address), void *e
goto exit; goto exit;
} }
} }
{
float a = info->used_space, b=info->total_space;
float percentage = (1 - (a / b)) * 100;
printf("LeafNode# %d has %d BasementNodes and %2.1f%% of the allocated space is garbage\n", (int)blocknum.b, node->n_children, percentage);
}
exit: exit:
toku_ftnode_free(&node); toku_ftnode_free(&node);
toku_free(ndd); toku_free(ndd);
......
...@@ -714,7 +714,7 @@ static int toku_recover_xstillopenprepared (struct logtype_xstillopenprepared *l ...@@ -714,7 +714,7 @@ static int toku_recover_xstillopenprepared (struct logtype_xstillopenprepared *l
} }
switch (renv->ss.ss) { switch (renv->ss.ss) {
case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: { case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: {
toku_txn_prepare_txn(txn, l->xa_xid); toku_txn_prepare_txn(txn, l->xa_xid, 0);
break; break;
} }
case FORWARD_NEWER_CHECKPOINT_END: { case FORWARD_NEWER_CHECKPOINT_END: {
...@@ -778,7 +778,7 @@ static int toku_recover_xprepare (struct logtype_xprepare *l, RECOVER_ENV renv) ...@@ -778,7 +778,7 @@ static int toku_recover_xprepare (struct logtype_xprepare *l, RECOVER_ENV renv)
assert(txn!=NULL); assert(txn!=NULL);
// Save the transaction // Save the transaction
toku_txn_prepare_txn(txn, l->xa_xid); toku_txn_prepare_txn(txn, l->xa_xid, 0);
return 0; return 0;
} }
......
...@@ -558,7 +558,7 @@ static void copy_xid (TOKU_XA_XID *dest, TOKU_XA_XID *source) { ...@@ -558,7 +558,7 @@ static void copy_xid (TOKU_XA_XID *dest, TOKU_XA_XID *source) {
memcpy(dest->data, source->data, source->gtrid_length+source->bqual_length); memcpy(dest->data, source->data, source->gtrid_length+source->bqual_length);
} }
void toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xa_xid) { void toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xa_xid, int nosync) {
if (txn->parent || toku_txn_is_read_only(txn)) { if (txn->parent || toku_txn_is_read_only(txn)) {
// We do not prepare children. // We do not prepare children.
// //
...@@ -573,7 +573,7 @@ void toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xa_xid) { ...@@ -573,7 +573,7 @@ void toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xa_xid) {
txn->state = TOKUTXN_PREPARING; txn->state = TOKUTXN_PREPARING;
toku_txn_unlock_state(txn); toku_txn_unlock_state(txn);
// Do we need to do an fsync? // Do we need to do an fsync?
txn->do_fsync = (txn->force_fsync_on_commit || txn->roll_info.num_rollentries>0); txn->do_fsync = txn->force_fsync_on_commit || (!nosync && txn->roll_info.num_rollentries>0);
copy_xid(&txn->xa_xid, xa_xid); copy_xid(&txn->xa_xid, xa_xid);
// This list will go away with #4683, so we wn't need the ydb lock for this anymore. // This list will go away with #4683, so we wn't need the ydb lock for this anymore.
toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid, xa_xid); toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid, xa_xid);
......
...@@ -302,7 +302,7 @@ int toku_txn_abort_with_lsn(struct tokutxn *txn, LSN oplsn, ...@@ -302,7 +302,7 @@ int toku_txn_abort_with_lsn(struct tokutxn *txn, LSN oplsn,
int toku_txn_discard_txn(struct tokutxn *txn); int toku_txn_discard_txn(struct tokutxn *txn);
void toku_txn_prepare_txn (struct tokutxn *txn, TOKU_XA_XID *xid); void toku_txn_prepare_txn (struct tokutxn *txn, TOKU_XA_XID *xid, int nosync);
// Effect: Do the internal work of preparing a transaction (does not log the prepare record). // Effect: Do the internal work of preparing a transaction (does not log the prepare record).
void toku_txn_get_prepared_xa_xid(struct tokutxn *txn, TOKU_XA_XID *xa_xid); void toku_txn_get_prepared_xa_xid(struct tokutxn *txn, TOKU_XA_XID *xa_xid);
......
...@@ -147,7 +147,7 @@ run_test(void) { ...@@ -147,7 +147,7 @@ run_test(void) {
uint8_t gid[DB_GID_SIZE]; uint8_t gid[DB_GID_SIZE];
memset(gid, 0, DB_GID_SIZE); memset(gid, 0, DB_GID_SIZE);
gid[0]=42; gid[0]=42;
r = txn->prepare(txn, gid); CKERR(r); r = txn->prepare(txn, gid, 0); CKERR(r);
} }
r = env->txn_checkpoint(env, 0, 0, 0); CKERR(r); r = env->txn_checkpoint(env, 0, 0, 0); CKERR(r);
......
...@@ -122,7 +122,7 @@ static void setup_env_and_prepare (DB_ENV **envp, const char *envdir, bool commi ...@@ -122,7 +122,7 @@ static void setup_env_and_prepare (DB_ENV **envp, const char *envdir, bool commi
uint8_t gid[DB_GID_SIZE]; uint8_t gid[DB_GID_SIZE];
memset(gid, 0, DB_GID_SIZE); memset(gid, 0, DB_GID_SIZE);
gid[0]=42; gid[0]=42;
CKERR(txn->prepare(txn, gid)); CKERR(txn->prepare(txn, gid, 0));
if (commit) if (commit)
CKERR(txn->commit(txn, 0)); CKERR(txn->commit(txn, 0));
} }
......
...@@ -124,7 +124,7 @@ static void setup_env_and_prepare (DB_ENV **envp, const char *envdir, bool commi ...@@ -124,7 +124,7 @@ static void setup_env_and_prepare (DB_ENV **envp, const char *envdir, bool commi
uint8_t gid[DB_GID_SIZE]; uint8_t gid[DB_GID_SIZE];
memset(gid, 0, DB_GID_SIZE); memset(gid, 0, DB_GID_SIZE);
gid[0]=42; gid[0]=42;
CKERR(txn->prepare(txn, gid)); CKERR(txn->prepare(txn, gid, 0));
if (commit) if (commit)
CKERR(txn->commit(txn, 0)); CKERR(txn->commit(txn, 0));
} }
......
...@@ -147,7 +147,7 @@ static void setup_env_and_prepare (DB_ENV **envp, const char *envdir) { ...@@ -147,7 +147,7 @@ static void setup_env_and_prepare (DB_ENV **envp, const char *envdir) {
uint8_t gid[DB_GID_SIZE]; uint8_t gid[DB_GID_SIZE];
memset(gid, 0, DB_GID_SIZE); memset(gid, 0, DB_GID_SIZE);
gid[0]='a'+tnum; gid[0]='a'+tnum;
CKERR(txn->prepare(txn, gid)); CKERR(txn->prepare(txn, gid, 0));
// Drop txn on the ground, since we will commit or abort it after recovery // Drop txn on the ground, since we will commit or abort it after recovery
if (tnum==0) { if (tnum==0) {
//printf("commit %d\n", tnum); //printf("commit %d\n", tnum);
......
...@@ -125,7 +125,7 @@ static void setup_env_and_prepare (DB_ENV **envp, const char *envdir, bool commi ...@@ -125,7 +125,7 @@ static void setup_env_and_prepare (DB_ENV **envp, const char *envdir, bool commi
.gtrid_length = 8, .gtrid_length = 8,
.bqual_length = 9}; .bqual_length = 9};
for (int i=0; i<8+9; i++) x.data[i] = 42+i; for (int i=0; i<8+9; i++) x.data[i] = 42+i;
CKERR(txn->xa_prepare(txn, &x)); CKERR(txn->xa_prepare(txn, &x, 0));
if (commit) if (commit)
CKERR(txn->commit(txn, 0)); CKERR(txn->commit(txn, 0));
} }
......
...@@ -118,7 +118,7 @@ static void setup_env_and_prepare (DB_ENV **envp, const char *envdir, bool commi ...@@ -118,7 +118,7 @@ static void setup_env_and_prepare (DB_ENV **envp, const char *envdir, bool commi
uint8_t gid[DB_GID_SIZE]; uint8_t gid[DB_GID_SIZE];
memset(gid, 0, DB_GID_SIZE); memset(gid, 0, DB_GID_SIZE);
gid[0]=42; gid[0]=42;
CKERR(txn->prepare(txn, gid)); CKERR(txn->prepare(txn, gid, 0));
{ int chk_r = db->close(db, 0); CKERR(chk_r); } { int chk_r = db->close(db, 0); CKERR(chk_r); }
if (commit) if (commit)
CKERR(txn->commit(txn, 0)); CKERR(txn->commit(txn, 0));
......
...@@ -133,7 +133,7 @@ test_main(int argc, char *const argv[]) { ...@@ -133,7 +133,7 @@ test_main(int argc, char *const argv[]) {
uint8_t gid[DB_GID_SIZE]; uint8_t gid[DB_GID_SIZE];
memset(gid, 0, DB_GID_SIZE); memset(gid, 0, DB_GID_SIZE);
gid[0]='a'; gid[0]='a';
r = child_txn->prepare(child_txn, gid); r = child_txn->prepare(child_txn, gid, 0);
CKERR(r); CKERR(r);
r = env->txn_checkpoint(env, 0, 0, 0); r = env->txn_checkpoint(env, 0, 0, 0);
......
...@@ -95,12 +95,12 @@ PATENT RIGHTS GRANT: ...@@ -95,12 +95,12 @@ PATENT RIGHTS GRANT:
int num_interrupts_called; int num_interrupts_called;
static bool interrupt(void* extra UU()) { static bool interrupt(void* extra UU(), uint64_t rows UU()) {
num_interrupts_called++; num_interrupts_called++;
return false; return false;
} }
static bool interrupt_true(void* extra UU()) { static bool interrupt_true(void* extra UU(), uint64_t rows UU()) {
num_interrupts_called++; num_interrupts_called++;
return true; return true;
} }
......
...@@ -211,7 +211,7 @@ cleanup: ...@@ -211,7 +211,7 @@ cleanup:
gid_count++; gid_count++;
uint32_t *hi_gid_count_p = cast_to_typeof(hi_gid_count_p) hi_gid; // make gcc --happy about -Wstrict-aliasing uint32_t *hi_gid_count_p = cast_to_typeof(hi_gid_count_p) hi_gid; // make gcc --happy about -Wstrict-aliasing
*hi_gid_count_p = gid_count; *hi_gid_count_p = gid_count;
int rr = hi_txn->prepare(hi_txn, hi_gid); int rr = hi_txn->prepare(hi_txn, hi_gid, 0);
CKERR(rr); CKERR(rr);
if (r || (random() % 2)) { if (r || (random() % 2)) {
rr = hi_txn->abort(hi_txn); rr = hi_txn->abort(hi_txn);
......
...@@ -130,7 +130,7 @@ test_txn_close_before_prepare_commit (void) { ...@@ -130,7 +130,7 @@ test_txn_close_before_prepare_commit (void) {
uint8_t gid[DB_GID_SIZE]; uint8_t gid[DB_GID_SIZE];
memset(gid, 1, DB_GID_SIZE); memset(gid, 1, DB_GID_SIZE);
r = txn->prepare(txn, gid); assert(r == 0); r = txn->prepare(txn, gid, 0); assert(r == 0);
r = txn->commit(txn, 0); assert(r == 0); r = txn->commit(txn, 0); assert(r == 0);
r = env->close(env, 0); assert(r == 0); r = env->close(env, 0); assert(r == 0);
......
...@@ -593,7 +593,7 @@ static void *worker(void *arg_v) { ...@@ -593,7 +593,7 @@ static void *worker(void *arg_v) {
uint64_t gid_val = txn->id64(txn); uint64_t gid_val = txn->id64(txn);
uint64_t *gid_count_p = cast_to_typeof(gid_count_p) gid; // make gcc --happy about -Wstrict-aliasing uint64_t *gid_count_p = cast_to_typeof(gid_count_p) gid; // make gcc --happy about -Wstrict-aliasing
*gid_count_p = gid_val; *gid_count_p = gid_val;
int rr = txn->prepare(txn, gid); int rr = txn->prepare(txn, gid, 0);
assert_zero(rr); assert_zero(rr);
} }
if (r == 0) { if (r == 0) {
......
...@@ -136,7 +136,7 @@ static void create_prepared_txn(void) { ...@@ -136,7 +136,7 @@ static void create_prepared_txn(void) {
for (int i = 0; i < 8+9; i++) { for (int i = 0; i < 8+9; i++) {
xid.data[i] = i; xid.data[i] = i;
} }
r = txn->xa_prepare(txn, &xid); r = txn->xa_prepare(txn, &xid, 0);
CKERR(r); CKERR(r);
// discard the txn so that we can close the env and run xa recovery later // discard the txn so that we can close the env and run xa recovery later
......
...@@ -136,7 +136,7 @@ static void create_prepared_txn(void) { ...@@ -136,7 +136,7 @@ static void create_prepared_txn(void) {
for (int i = 0; i < 8+9; i++) { for (int i = 0; i < 8+9; i++) {
xid.data[i] = i; xid.data[i] = i;
} }
r = txn->xa_prepare(txn, &xid); r = txn->xa_prepare(txn, &xid, 0);
CKERR(r); CKERR(r);
// discard the txn so that we can close the env and run xa recovery later // discard the txn so that we can close the env and run xa recovery later
......
...@@ -138,7 +138,7 @@ static void create_prepared_txn(void) { ...@@ -138,7 +138,7 @@ static void create_prepared_txn(void) {
for (int i = 0; i < 8+9; i++) { for (int i = 0; i < 8+9; i++) {
xid.data[i] = i; xid.data[i] = i;
} }
r = txn->xa_prepare(txn, &xid); r = txn->xa_prepare(txn, &xid, 0);
CKERR(r); CKERR(r);
// discard the txn so that we can close the env and run xa recovery later // discard the txn so that we can close the env and run xa recovery later
......
...@@ -139,7 +139,7 @@ static void create_prepared_txn(void) { ...@@ -139,7 +139,7 @@ static void create_prepared_txn(void) {
for (int i = 0; i < 8+9; i++) { for (int i = 0; i < 8+9; i++) {
xid.data[i] = i; xid.data[i] = i;
} }
r = txn->xa_prepare(txn, &xid); r = txn->xa_prepare(txn, &xid, 0);
CKERR(r); CKERR(r);
// discard the txn so that we can close the env and run xa recovery later // discard the txn so that we can close the env and run xa recovery later
......
...@@ -769,7 +769,7 @@ c_remove_restriction(DBC *dbc) { ...@@ -769,7 +769,7 @@ c_remove_restriction(DBC *dbc) {
} }
static void static void
c_set_check_interrupt_callback(DBC* dbc, bool (*interrupt_callback)(void*), void *extra) { c_set_check_interrupt_callback(DBC* dbc, bool (*interrupt_callback)(void*, uint64_t), void *extra) {
toku_ft_cursor_set_check_interrupt_cb(dbc_ftcursor(dbc), interrupt_callback, extra); toku_ft_cursor_set_check_interrupt_cb(dbc_ftcursor(dbc), interrupt_callback, extra);
} }
......
...@@ -240,7 +240,7 @@ static int toku_txn_abort(DB_TXN * txn, ...@@ -240,7 +240,7 @@ static int toku_txn_abort(DB_TXN * txn,
return r; return r;
} }
static int toku_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) { static int toku_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid, uint32_t flags) {
int r = 0; int r = 0;
if (!txn) { if (!txn) {
r = EINVAL; r = EINVAL;
...@@ -273,9 +273,11 @@ static int toku_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) { ...@@ -273,9 +273,11 @@ static int toku_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) {
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
} }
assert(!db_txn_struct_i(txn)->child); assert(!db_txn_struct_i(txn)->child);
int nosync;
nosync = (flags & DB_TXN_NOSYNC)!=0 || (db_txn_struct_i(txn)->flags&DB_TXN_NOSYNC);
TOKUTXN ttxn; TOKUTXN ttxn;
ttxn = db_txn_struct_i(txn)->tokutxn; ttxn = db_txn_struct_i(txn)->tokutxn;
toku_txn_prepare_txn(ttxn, xid); toku_txn_prepare_txn(ttxn, xid, nosync);
TOKULOGGER logger; TOKULOGGER logger;
logger = txn->mgrp->i->logger; logger = txn->mgrp->i->logger;
LSN do_fsync_lsn; LSN do_fsync_lsn;
...@@ -292,14 +294,14 @@ exit: ...@@ -292,14 +294,14 @@ exit:
// requires: must hold the multi operation lock. it is // requires: must hold the multi operation lock. it is
// released in toku_txn_xa_prepare before the fsync. // released in toku_txn_xa_prepare before the fsync.
static int toku_txn_prepare (DB_TXN *txn, uint8_t gid[DB_GID_SIZE]) { static int toku_txn_prepare (DB_TXN *txn, uint8_t gid[DB_GID_SIZE], uint32_t flags) {
TOKU_XA_XID xid; TOKU_XA_XID xid;
TOKU_ANNOTATE_NEW_MEMORY(&xid, sizeof(xid)); TOKU_ANNOTATE_NEW_MEMORY(&xid, sizeof(xid));
xid.formatID=0x756b6f54; // "Toku" xid.formatID=0x756b6f54; // "Toku"
xid.gtrid_length=DB_GID_SIZE/2; // The maximum allowed gtrid length is 64. See the XA spec in source:/import/opengroup.org/C193.pdf page 20. xid.gtrid_length=DB_GID_SIZE/2; // The maximum allowed gtrid length is 64. See the XA spec in source:/import/opengroup.org/C193.pdf page 20.
xid.bqual_length=DB_GID_SIZE/2; // The maximum allowed bqual length is 64. xid.bqual_length=DB_GID_SIZE/2; // The maximum allowed bqual length is 64.
memcpy(xid.data, gid, DB_GID_SIZE); memcpy(xid.data, gid, DB_GID_SIZE);
return toku_txn_xa_prepare(txn, &xid); return toku_txn_xa_prepare(txn, &xid, flags);
} }
static int toku_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) { static int toku_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) {
...@@ -427,6 +429,10 @@ static bool toku_txn_is_prepared(DB_TXN *txn) { ...@@ -427,6 +429,10 @@ static bool toku_txn_is_prepared(DB_TXN *txn) {
return toku_txn_get_state(ttxn) == TOKUTXN_PREPARING; return toku_txn_get_state(ttxn) == TOKUTXN_PREPARING;
} }
static DB_TXN *toku_txn_get_child(DB_TXN *txn) {
return db_txn_struct_i(txn)->child;
}
static inline void txn_func_init(DB_TXN *txn) { static inline void txn_func_init(DB_TXN *txn) {
#define STXN(name) txn->name = locked_txn_ ## name #define STXN(name) txn->name = locked_txn_ ## name
STXN(abort); STXN(abort);
...@@ -444,6 +450,7 @@ static inline void txn_func_init(DB_TXN *txn) { ...@@ -444,6 +450,7 @@ static inline void txn_func_init(DB_TXN *txn) {
#undef SUTXN #undef SUTXN
txn->id64 = toku_txn_id64; txn->id64 = toku_txn_id64;
txn->is_prepared = toku_txn_is_prepared; txn->is_prepared = toku_txn_is_prepared;
txn->get_child = toku_txn_get_child;
} }
// //
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment