Commit 5a6cd59d authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

#refs 5133 merge undo-do algorithm changes to main. now, we read a row from...

#refs 5133 merge undo-do algorithm changes to main. now, we read a row from the source db AND gather up the leaf entrie's provisional data in the cursor callback, which makes it atomic with respect to commit and abort on that DB. had to fix a few tests after changing some leaf entry signatures


git-svn-id: file:///svn/toku/tokudb@44949 c7de825b-a66e-492c-adef-691d508d4ae1
parent f5d76ac4
...@@ -13,8 +13,6 @@ ...@@ -13,8 +13,6 @@
// - does not perform snapshot reads. it reads everything, including uncommitted. // - does not perform snapshot reads. it reads everything, including uncommitted.
// //
// A LE_CURSOR is good for scanning a FT from beginning to end. Useful for hot indexing. // A LE_CURSOR is good for scanning a FT from beginning to end. Useful for hot indexing.
//
// It caches the key that is was last positioned over to speed up key comparisions.
struct le_cursor { struct le_cursor {
// TODO: remove DBs from the ft layer comparison function // TODO: remove DBs from the ft layer comparison function
...@@ -22,26 +20,22 @@ struct le_cursor { ...@@ -22,26 +20,22 @@ struct le_cursor {
// use a fake db for comparisons. // use a fake db for comparisons.
struct __toku_db fake_db; struct __toku_db fake_db;
FT_CURSOR ft_cursor; FT_CURSOR ft_cursor;
DBT key; // the key that the le cursor is positioned at
// TODO a better implementation would fetch the key from the brt cursor
bool neg_infinity; // true when the le cursor is positioned at -infinity (initial setting) bool neg_infinity; // true when the le cursor is positioned at -infinity (initial setting)
bool pos_infinity; // true when the le cursor is positioned at +infinity (when _next returns DB_NOTFOUND) bool pos_infinity; // true when the le cursor is positioned at +infinity (when _next returns DB_NOTFOUND)
}; };
int int
toku_le_cursor_create(LE_CURSOR *le_cursor_result, FT_HANDLE brt, TOKUTXN txn) { toku_le_cursor_create(LE_CURSOR *le_cursor_result, FT_HANDLE ft_handle, TOKUTXN txn) {
int result = 0; int result = 0;
LE_CURSOR le_cursor = (LE_CURSOR) toku_malloc(sizeof (struct le_cursor)); LE_CURSOR le_cursor = (LE_CURSOR) toku_malloc(sizeof (struct le_cursor));
if (le_cursor == NULL) { if (le_cursor == NULL) {
result = errno; result = errno;
} }
else { else {
result = toku_ft_cursor(brt, &le_cursor->ft_cursor, txn, false, false); result = toku_ft_cursor(ft_handle, &le_cursor->ft_cursor, txn, false, false);
if (result == 0) { if (result == 0) {
// TODO move the leaf mode to the brt cursor constructor // TODO move the leaf mode to the ft cursor constructor
toku_ft_cursor_set_leaf_mode(le_cursor->ft_cursor); toku_ft_cursor_set_leaf_mode(le_cursor->ft_cursor);
toku_init_dbt(&le_cursor->key);
le_cursor->key.flags = DB_DBT_REALLOC;
le_cursor->neg_infinity = true; le_cursor->neg_infinity = true;
le_cursor->pos_infinity = false; le_cursor->pos_infinity = false;
// zero out the fake DB. this is a rare operation so it's not too slow. // zero out the fake DB. this is a rare operation so it's not too slow.
...@@ -61,46 +55,26 @@ toku_le_cursor_create(LE_CURSOR *le_cursor_result, FT_HANDLE brt, TOKUTXN txn) { ...@@ -61,46 +55,26 @@ toku_le_cursor_create(LE_CURSOR *le_cursor_result, FT_HANDLE brt, TOKUTXN txn) {
int int
toku_le_cursor_close(LE_CURSOR le_cursor) { toku_le_cursor_close(LE_CURSOR le_cursor) {
int result = toku_ft_cursor_close(le_cursor->ft_cursor); int result = toku_ft_cursor_close(le_cursor->ft_cursor);
toku_destroy_dbt(&le_cursor->key);
toku_free(le_cursor); toku_free(le_cursor);
return result; return result;
} }
// this implementation copies the key and leafentry into the supplied DBTs. // Move to the next leaf entry under the LE_CURSOR
// this may be too slow. an alternative implementation could avoid copying the // Success: returns zero, calls the getf callback with the getf_v parameter
// key by fetching the key from the brt cursor, and could avoid copying the leaf entry // Failure: returns a non-zero error number
// by processing the leaf entry in the brt cursor callback.
struct le_cursor_callback_arg {
DBT *key, *val;
};
// copy the key and the leaf entry to the given DBTs
static int
le_cursor_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, void *v, bool lock_only) {
if (lock_only) {
; // do nothing
} else {
struct le_cursor_callback_arg *arg = (struct le_cursor_callback_arg *) v;
toku_dbt_set(keylen, key, arg->key, NULL);
toku_dbt_set(vallen, val, arg->val, NULL);
}
return 0;
}
int int
toku_le_cursor_next(LE_CURSOR le_cursor, DBT *le) { toku_le_cursor_next(LE_CURSOR le_cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) {
int result; int result;
if (le_cursor->pos_infinity) if (le_cursor->pos_infinity) {
result = DB_NOTFOUND; result = DB_NOTFOUND;
else { } else {
le_cursor->neg_infinity = false; le_cursor->neg_infinity = false;
struct le_cursor_callback_arg arg = { &le_cursor->key, le }; // TODO replace this with a non deprecated function. Which?
// TODO replace this with a non deprecated function result = toku_ft_cursor_get(le_cursor->ft_cursor, NULL, getf, getf_v, DB_NEXT);
result = toku_ft_cursor_get(le_cursor->ft_cursor, NULL, le_cursor_callback, &arg, DB_NEXT); if (result == DB_NOTFOUND) {
if (result == DB_NOTFOUND)
le_cursor->pos_infinity = true; le_cursor->pos_infinity = true;
} }
}
return result; return result;
} }
...@@ -115,9 +89,10 @@ toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key) { ...@@ -115,9 +89,10 @@ toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key) {
// get the comparison function and descriptor from the cursor's ft // get the comparison function and descriptor from the cursor's ft
FT_HANDLE ft_handle = le_cursor->ft_cursor->ft_handle; FT_HANDLE ft_handle = le_cursor->ft_cursor->ft_handle;
ft_compare_func keycompare = toku_ft_get_bt_compare(ft_handle); ft_compare_func keycompare = toku_ft_get_bt_compare(ft_handle);
// store the descriptor in the fake DB to do a key comparison
le_cursor->fake_db.cmp_descriptor = toku_ft_get_cmp_descriptor(ft_handle); le_cursor->fake_db.cmp_descriptor = toku_ft_get_cmp_descriptor(ft_handle);
int r = keycompare(&le_cursor->fake_db, &le_cursor->key, key); // get the current position from the cursor and compare it to the given key.
DBT *cursor_key = &le_cursor->ft_cursor->key;
int r = keycompare(&le_cursor->fake_db, cursor_key, key);
if (r < 0) { if (r < 0) {
result = true; // key is right of the cursor key result = true; // key is right of the cursor key
} else { } else {
......
...@@ -7,6 +7,8 @@ ...@@ -7,6 +7,8 @@
#ifndef LE_CURSOR_H #ifndef LE_CURSOR_H
#define LE_CURSOR_H #define LE_CURSOR_H
#include "ft-ops.h"
// A leaf entry cursor (LE_CURSOR) is a special type of FT_CURSOR that visits all of the leaf entries in a tree // A leaf entry cursor (LE_CURSOR) is a special type of FT_CURSOR that visits all of the leaf entries in a tree
// and returns the leaf entry to the caller. It maintains a copy of the key that it was last positioned over to // and returns the leaf entry to the caller. It maintains a copy of the key that it was last positioned over to
// speed up key comparisions with a given key. For example, the hot indexing could use the _key_right_of_cursor // speed up key comparisions with a given key. For example, the hot indexing could use the _key_right_of_cursor
...@@ -27,10 +29,10 @@ int toku_le_cursor_create(LE_CURSOR *le_cursor_result, FT_HANDLE brt, TOKUTXN tx ...@@ -27,10 +29,10 @@ int toku_le_cursor_create(LE_CURSOR *le_cursor_result, FT_HANDLE brt, TOKUTXN tx
// Failure: returns a non-zero error number // Failure: returns a non-zero error number
int toku_le_cursor_close(LE_CURSOR le_cursor); int toku_le_cursor_close(LE_CURSOR le_cursor);
// Retrieve the next leaf entry under the LE_CURSOR // Move to the next leaf entry under the LE_CURSOR
// Success: returns zero, stores the leaf entry key into the key dbt, and the leaf entry into the val dbt // Success: returns zero, calls the getf callback with the getf_v parameter
// Failure: returns a non-zero error number // Failure: returns a non-zero error number
int toku_le_cursor_next(LE_CURSOR le_cursor, DBT *le); int toku_le_cursor_next(LE_CURSOR le_cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v);
// Return TRUE if the key is to the right of the LE_CURSOR position. that is, current cursor key < given key // Return TRUE if the key is to the right of the LE_CURSOR position. that is, current cursor key < given key
// Otherwise returns FALSE when the key is at or to the left of the LE_CURSOR position. that is, current cursor key >= given key // Otherwise returns FALSE when the key is at or to the left of the LE_CURSOR position. that is, current cursor key >= given key
......
...@@ -13,6 +13,21 @@ ...@@ -13,6 +13,21 @@
static TOKUTXN const null_txn = 0; static TOKUTXN const null_txn = 0;
static DB * const null_db = 0; static DB * const null_db = 0;
static int
get_next_callback(ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN vallen, bytevec val, void *extra, bool lock_only) {
DBT *val_dbt = extra;
if (!lock_only) {
toku_dbt_set(vallen, val, val_dbt, NULL);
}
return 0;
}
static int
le_cursor_get_next(LE_CURSOR cursor, DBT *val) {
int r = toku_le_cursor_next(cursor, get_next_callback, val);
return r;
}
static int test_ft_cursor_keycompare(DB *desc __attribute__((unused)), const DBT *a, const DBT *b) { static int test_ft_cursor_keycompare(DB *desc __attribute__((unused)), const DBT *a, const DBT *b) {
return toku_keycompare(a->data, a->size, b->data, b->size); return toku_keycompare(a->data, a->size, b->data, b->size);
} }
...@@ -141,7 +156,7 @@ test_provdel(const char *logdir, const char *fname, int n) { ...@@ -141,7 +156,7 @@ test_provdel(const char *logdir, const char *fname, int n) {
int i; int i;
for (i=0; ; i++) { for (i=0; ; i++) {
error = toku_le_cursor_next(cursor, &val); error = le_cursor_get_next(cursor, &val);
if (error != 0) if (error != 0)
break; break;
......
...@@ -16,6 +16,21 @@ ...@@ -16,6 +16,21 @@
static TOKUTXN const null_txn = 0; static TOKUTXN const null_txn = 0;
static DB * const null_db = 0; static DB * const null_db = 0;
static int
get_next_callback(ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN vallen, bytevec val, void *extra, bool lock_only) {
DBT *val_dbt = extra;
if (!lock_only) {
toku_dbt_set(vallen, val, val_dbt, NULL);
}
return 0;
}
static int
le_cursor_get_next(LE_CURSOR cursor, DBT *val) {
int r = toku_le_cursor_next(cursor, get_next_callback, val);
return r;
}
static int static int
test_keycompare(DB* UU(desc), const DBT *a, const DBT *b) { test_keycompare(DB* UU(desc), const DBT *a, const DBT *b) {
return toku_keycompare(a->data, a->size, b->data, b->size); return toku_keycompare(a->data, a->size, b->data, b->size);
...@@ -148,7 +163,7 @@ test_pos_infinity(const char *fname, int n) { ...@@ -148,7 +163,7 @@ test_pos_infinity(const char *fname, int n) {
int i; int i;
for (i = 0; ; i++) { for (i = 0; ; i++) {
error = toku_le_cursor_next(cursor, &val); error = le_cursor_get_next(cursor, &val);
if (error != 0) if (error != 0)
break; break;
...@@ -210,7 +225,7 @@ test_between(const char *fname, int n) { ...@@ -210,7 +225,7 @@ test_between(const char *fname, int n) {
int i; int i;
for (i = 0; ; i++) { for (i = 0; ; i++) {
// move the LE_CURSOR forward // move the LE_CURSOR forward
error = toku_le_cursor_next(cursor, &val); error = le_cursor_get_next(cursor, &val);
if (error != 0) if (error != 0)
break; break;
......
...@@ -13,6 +13,21 @@ ...@@ -13,6 +13,21 @@
static TOKUTXN const null_txn = 0; static TOKUTXN const null_txn = 0;
static DB * const null_db = 0; static DB * const null_db = 0;
static int
get_next_callback(ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN vallen, bytevec val, void *extra, bool lock_only) {
DBT *val_dbt = extra;
if (!lock_only) {
toku_dbt_set(vallen, val, val_dbt, NULL);
}
return 0;
}
static int
le_cursor_get_next(LE_CURSOR cursor, DBT *val) {
int r = toku_le_cursor_next(cursor, get_next_callback, val);
return r;
}
static int test_ft_cursor_keycompare(DB *db __attribute__((unused)), const DBT *a, const DBT *b) { static int test_ft_cursor_keycompare(DB *db __attribute__((unused)), const DBT *a, const DBT *b) {
return toku_keycompare(a->data, a->size, b->data, b->size); return toku_keycompare(a->data, a->size, b->data, b->size);
} }
...@@ -109,7 +124,7 @@ walk_tree(const char *fname, int n) { ...@@ -109,7 +124,7 @@ walk_tree(const char *fname, int n) {
for (i = 0; ; i++) { for (i = 0; ; i++) {
error = TOKUDB_TRY_AGAIN; error = TOKUDB_TRY_AGAIN;
while (error == TOKUDB_TRY_AGAIN) { while (error == TOKUDB_TRY_AGAIN) {
error = toku_le_cursor_next(cursor, &val); error = le_cursor_get_next(cursor, &val);
} }
if (error != 0) if (error != 0)
break; break;
......
...@@ -23,6 +23,19 @@ struct indexer_commit_keys { ...@@ -23,6 +23,19 @@ struct indexer_commit_keys {
DBT *keys; // the variable length keys array DBT *keys; // the variable length keys array
}; };
// a ule and all of its provisional txn info
// used by the undo-do algorithm to gather up ule provisional info in
// a cursor callback that provides exclusive access to the source DB
// with respect to txn commit and abort
struct ule_prov_info {
ULEHANDLE ule;
uint32_t num_provisional;
uint32_t num_committed;
TXNID *prov_ids;
TOKUTXN *prov_txns;
TOKUTXN_STATE *prov_states;
};
struct __toku_indexer_internal { struct __toku_indexer_internal {
DB_ENV *env; DB_ENV *env;
DB_TXN *txn; DB_TXN *txn;
...@@ -63,6 +76,6 @@ void indexer_undo_do_init(DB_INDEXER *indexer); ...@@ -63,6 +76,6 @@ void indexer_undo_do_init(DB_INDEXER *indexer);
void indexer_undo_do_destroy(DB_INDEXER *indexer); void indexer_undo_do_destroy(DB_INDEXER *indexer);
int indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule); int indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule, struct ule_prov_info *prov_info);
#endif #endif
...@@ -177,49 +177,6 @@ indexer_undo_do_committed(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) { ...@@ -177,49 +177,6 @@ indexer_undo_do_committed(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
return result; return result;
} }
static void fill_prov_info(
ULEHANDLE ule,
TXNID* prov_ids,
TOKUTXN_STATE* prov_states,
TOKUTXN* prov_txns,
DB_INDEXER *indexer
)
{
uint32_t num_provisional = ule_get_num_provisional(ule);
uint32_t num_committed = ule_get_num_committed(ule);
DB_ENV *env = indexer->i->env;
TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger);
toku_txn_manager_suspend(txn_manager);
for (uint32_t i = 0; i < num_provisional; i++) {
UXRHANDLE uxr = ule_get_uxr(ule, num_committed+i);
prov_ids[i] = uxr_get_txnid(uxr);
if (indexer->i->test_xid_state) {
prov_states[i] = indexer->i->test_xid_state(indexer, prov_ids[i]);
prov_txns[i] = NULL;
}
else {
TOKUTXN txn = NULL;
toku_txn_manager_id2txn_unlocked(
txn_manager,
prov_ids[i],
&txn
);
prov_txns[i] = txn;
if (txn) {
prov_states[i] = toku_txn_get_state(txn);
if (prov_states[i] == TOKUTXN_LIVE || prov_states[i] == TOKUTXN_PREPARING) {
// pin
toku_txn_manager_pin_live_txn_unlocked(txn_manager, txn);
}
}
else {
prov_states[i] = TOKUTXN_RETIRED;
}
}
}
toku_txn_manager_resume(txn_manager);
}
static void release_txns( static void release_txns(
ULEHANDLE ule, ULEHANDLE ule,
TOKUTXN_STATE* prov_states, TOKUTXN_STATE* prov_states,
...@@ -255,24 +212,24 @@ exit: ...@@ -255,24 +212,24 @@ exit:
} }
static int static int
indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) { indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule, struct ule_prov_info *prov_info) {
int result = 0; int result = 0;
uint32_t num_committed = ule_get_num_committed(ule);
uint32_t num_provisional = ule_get_num_provisional(ule);
indexer_commit_keys_set_empty(&indexer->i->commit_keys); indexer_commit_keys_set_empty(&indexer->i->commit_keys);
// init the xids to the root xid // init the xids to the root xid
XIDS xids = xids_get_root_xids(); XIDS xids = xids_get_root_xids();
TXNID prov_ids[num_provisional]; uint32_t num_provisional = prov_info->num_provisional;
TOKUTXN_STATE prov_states[num_provisional]; uint32_t num_committed = prov_info->num_committed;
TOKUTXN prov_txns[num_provisional]; TXNID *prov_ids = prov_info->prov_ids;
memset(prov_txns, 0, sizeof(prov_txns)); TOKUTXN *prov_txns = prov_info->prov_txns;
TOKUTXN_STATE *prov_states = prov_info->prov_states;
// nothing to do if there's nothing provisional
if (num_provisional == 0) { if (num_provisional == 0) {
goto exit; goto exit;
} }
fill_prov_info(ule, prov_ids, prov_states, prov_txns, indexer);
TXNID outermost_xid_state = prov_states[0]; TXNID outermost_xid_state = prov_states[0];
// scan the provisional stack from the outermost to the innermost transaction record // scan the provisional stack from the outermost to the innermost transaction record
...@@ -407,13 +364,14 @@ exit: ...@@ -407,13 +364,14 @@ exit:
} }
int int
indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) { indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule, struct ule_prov_info *prov_info) {
int result = indexer_undo_do_committed(indexer, hotdb, ule); int result = indexer_undo_do_committed(indexer, hotdb, ule);
if (result == 0) if (result == 0) {
result = indexer_undo_do_provisional(indexer, hotdb, ule); result = indexer_undo_do_provisional(indexer, hotdb, ule, prov_info);
}
if ( indexer->i->test_only_flags == INDEXER_TEST_ONLY_ERROR_CALLBACK ) if (indexer->i->test_only_flags == INDEXER_TEST_ONLY_ERROR_CALLBACK) {
result = EINVAL; result = EINVAL;
}
return result; return result;
} }
......
...@@ -67,10 +67,6 @@ toku_indexer_get_status(INDEXER_STATUS statp) { ...@@ -67,10 +67,6 @@ toku_indexer_get_status(INDEXER_STATUS statp) {
#define STATUS_VALUE(x) indexer_status.status[x].value.num #define STATUS_VALUE(x) indexer_status.status[x].value.num
#include "indexer-internal.h" #include "indexer-internal.h"
static int build_index(DB_INDEXER *indexer); static int build_index(DB_INDEXER *indexer);
...@@ -148,6 +144,9 @@ toku_indexer_unlock(DB_INDEXER* indexer) { ...@@ -148,6 +144,9 @@ toku_indexer_unlock(DB_INDEXER* indexer) {
toku_mutex_unlock(&indexer->i->indexer_lock); toku_mutex_unlock(&indexer->i->indexer_lock);
} }
// forward declare the test-only wrapper function for undo-do
static int test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule);
int int
toku_indexer_create_indexer(DB_ENV *env, toku_indexer_create_indexer(DB_ENV *env,
DB_TXN *txn, DB_TXN *txn,
...@@ -176,7 +175,7 @@ toku_indexer_create_indexer(DB_ENV *env, ...@@ -176,7 +175,7 @@ toku_indexer_create_indexer(DB_ENV *env,
indexer->i->indexer_flags = indexer_flags; indexer->i->indexer_flags = indexer_flags;
indexer->i->loop_mod = 1000; // call poll_func every 1000 rows indexer->i->loop_mod = 1000; // call poll_func every 1000 rows
indexer->i->estimated_rows = 0; indexer->i->estimated_rows = 0;
indexer->i->undo_do = indexer_undo_do; // TEST export the undo do function indexer->i->undo_do = test_indexer_undo_do; // TEST export the undo do function
XCALLOC_N(N, indexer->i->fnums); XCALLOC_N(N, indexer->i->fnums);
if ( !indexer->i->fnums ) { rval = ENOMEM; goto create_exit; } if ( !indexer->i->fnums ) { rval = ENOMEM; goto create_exit; }
...@@ -280,48 +279,175 @@ toku_indexer_is_key_right_of_le_cursor(DB_INDEXER *indexer, const DBT *key) { ...@@ -280,48 +279,175 @@ toku_indexer_is_key_right_of_le_cursor(DB_INDEXER *indexer, const DBT *key) {
return toku_le_cursor_is_key_greater(indexer->i->lec, key); return toku_le_cursor_is_key_greater(indexer->i->lec, key);
} }
// initialize provisional info by allocating enough space to
// hold provisional ids, states, and txns for each of the
// provisional entries in the ule. the ule remains owned by
// the caller, not the prov info.
static void
ule_prov_info_init(struct ule_prov_info *prov_info, ULEHANDLE ule) {
prov_info->ule = ule;
prov_info->num_provisional = ule_get_num_provisional(ule);
prov_info->num_committed = ule_get_num_committed(ule);
uint32_t n = prov_info->num_provisional;
if (n > 0) {
prov_info->prov_ids = toku_malloc(n * sizeof(prov_info->prov_ids));
prov_info->prov_states = toku_malloc(n * sizeof(prov_info->prov_states));
prov_info->prov_txns = toku_malloc(n * sizeof(prov_info->prov_txns));
}
}
// clean up anything possibly created by ule_prov_info_init()
static void
ule_prov_info_destroy(struct ule_prov_info *prov_info) {
if (prov_info->num_provisional > 0) {
toku_free(prov_info->prov_ids);
toku_free(prov_info->prov_states);
toku_free(prov_info->prov_txns);
} else {
// nothing to free if there was nothing provisional
invariant(prov_info->prov_ids == NULL);
invariant(prov_info->prov_states == NULL);
invariant(prov_info->prov_txns == NULL);
}
}
static void
indexer_fill_prov_info(DB_INDEXER *indexer, struct ule_prov_info *prov_info) {
ULEHANDLE ule = prov_info->ule;
uint32_t num_provisional = prov_info->num_provisional;
uint32_t num_committed = prov_info->num_committed;
TXNID *prov_ids = prov_info->prov_ids;
TOKUTXN_STATE *prov_states = prov_info->prov_states;
TOKUTXN *prov_txns = prov_info->prov_txns;
// hold the txn manager lock while we inspect txn state
// and pin some live txns
DB_ENV *env = indexer->i->env;
TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger);
toku_txn_manager_suspend(txn_manager);
for (uint32_t i = 0; i < num_provisional; i++) {
UXRHANDLE uxr = ule_get_uxr(ule, num_committed + i);
prov_ids[i] = uxr_get_txnid(uxr);
if (indexer->i->test_xid_state) {
prov_states[i] = indexer->i->test_xid_state(indexer, prov_ids[i]);
prov_txns[i] = NULL;
}
else {
TOKUTXN txn = NULL;
toku_txn_manager_id2txn_unlocked(
txn_manager,
prov_ids[i],
&txn
);
prov_txns[i] = txn;
if (txn) {
prov_states[i] = toku_txn_get_state(txn);
if (prov_states[i] == TOKUTXN_LIVE || prov_states[i] == TOKUTXN_PREPARING) {
// pin this live txn so it can't commit or abort until we're done with it
toku_txn_manager_pin_live_txn_unlocked(txn_manager, txn);
}
}
else {
prov_states[i] = TOKUTXN_RETIRED;
}
}
}
toku_txn_manager_resume(txn_manager);
}
struct le_cursor_extra {
DB_INDEXER *indexer;
struct ule_prov_info *prov_info;
};
// cursor callback, so its synchronized with other db operations using
// cachetable pair locks. because no txn can commit on this db, read
// the provisional info for the newly read ule.
static int
le_cursor_callback(ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN vallen, bytevec val, void *extra, bool lock_only) {
if (lock_only || val == NULL) {
; // do nothing if only locking or val==NULL, meaning there are no more elements
} else {
struct le_cursor_extra *cursor_extra = extra;
struct ule_prov_info *prov_info = cursor_extra->prov_info;
// TODO(John): Do we need to actually copy this ule and save it after
// copying all of the provisional info? or is the info all we need?
void *le_buf = toku_xmemdup(val, vallen);
ULEHANDLE ule = toku_ule_create(le_buf);
invariant(ule);
ule_prov_info_init(prov_info, ule);
indexer_fill_prov_info(cursor_extra->indexer, prov_info);
}
return 0;
}
// get the next ule and fill out its provisional info in the
// prov_info struct provided. caller is responsible for cleaning
// up the ule info after it's done.
static int
get_next_ule_with_prov_info(DB_INDEXER *indexer, struct ule_prov_info *prov_info) {
struct le_cursor_extra extra = {
.indexer = indexer,
.prov_info = prov_info,
};
int r = toku_le_cursor_next(indexer->i->lec, le_cursor_callback, &extra);
return r;
}
static int static int
build_index(DB_INDEXER *indexer) { build_index(DB_INDEXER *indexer) {
int result = 0; int result = 0;
DBT key; toku_init_dbt_flags(&key, DB_DBT_REALLOC);
DBT le; toku_init_dbt_flags(&le, DB_DBT_REALLOC);
bool done = false; bool done = false;
for (uint64_t loop_count = 0; !done; loop_count++) { for (uint64_t loop_count = 0; !done; loop_count++) {
toku_indexer_lock(indexer); toku_indexer_lock(indexer);
result = toku_le_cursor_next(indexer->i->lec, &le); // grab the next leaf entry and get its provisional info. we'll
// need the provisional info for the undo-do algorithm, and we get
// it here so it can be read atomically with respect to txn commit
// and abort.
//
// this allocates space for the prov info, so we have to destroy it
// when we're done.
struct ule_prov_info prov_info;
memset(&prov_info, 0, sizeof(prov_info));
result = get_next_ule_with_prov_info(indexer, &prov_info);
if (result != 0) { if (result != 0) {
invariant(prov_info.ule == NULL);
done = true; done = true;
if (result == DB_NOTFOUND) if (result == DB_NOTFOUND) {
result = 0; // all done, normal way to exit loop successfully result = 0; // all done, normal way to exit loop successfully
} }
}
else { else {
// this code may be faster ule malloc/free is not done every time invariant(prov_info.ule);
ULEHANDLE ule = toku_ule_create(le.data); ULEHANDLE ule = prov_info.ule;
for (int which_db = 0; (which_db < indexer->i->N) && (result == 0); which_db++) { for (int which_db = 0; (which_db < indexer->i->N) && (result == 0); which_db++) {
DB *db = indexer->i->dest_dbs[which_db]; DB *db = indexer->i->dest_dbs[which_db];
result = indexer_undo_do(indexer, db, ule); result = indexer_undo_do(indexer, db, ule, &prov_info);
if ( (result != 0) && (indexer->i->error_callback != NULL)) { if ((result != 0) && (indexer->i->error_callback != NULL)) {
// grab the key and call the error callback
DBT key; toku_init_dbt_flags(&key, DB_DBT_REALLOC);
toku_dbt_set(ule_get_keylen(ule), ule_get_key(ule), &key, NULL); toku_dbt_set(ule_get_keylen(ule), ule_get_key(ule), &key, NULL);
indexer->i->error_callback(db, which_db, result, &key, NULL, indexer->i->error_extra); indexer->i->error_callback(db, which_db, result, &key, NULL, indexer->i->error_extra);
toku_destroy_dbt(&key);
} }
toku_ule_free(prov_info.ule);
} }
toku_ule_free(ule);
} }
toku_indexer_unlock(indexer); toku_indexer_unlock(indexer);
ule_prov_info_destroy(&prov_info);
if (result == 0) if (result == 0) {
result = maybe_call_poll_func(indexer, loop_count); result = maybe_call_poll_func(indexer, loop_count);
if (result != 0) }
if (result != 0) {
done = true; done = true;
} }
}
toku_destroy_dbt(&key);
toku_destroy_dbt(&le);
// post index creation cleanup // post index creation cleanup
// - optimize? // - optimize?
...@@ -334,7 +460,6 @@ build_index(DB_INDEXER *indexer) { ...@@ -334,7 +460,6 @@ build_index(DB_INDEXER *indexer) {
(void) __sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD_FAIL), 1); (void) __sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD_FAIL), 1);
} }
return result; return result;
} }
...@@ -349,11 +474,7 @@ close_indexer(DB_INDEXER *indexer) { ...@@ -349,11 +474,7 @@ close_indexer(DB_INDEXER *indexer) {
// to create them are not in the recovery log.) // to create them are not in the recovery log.)
DB_TXN *txn = indexer->i->txn; DB_TXN *txn = indexer->i->txn;
TOKUTXN tokutxn = db_txn_struct_i(txn)->tokutxn; TOKUTXN tokutxn = db_txn_struct_i(txn)->tokutxn;
//BRT brt; // caused a warning with -Wunused-but-set-variable
//DB *db;
for (int which_db = 0; which_db < indexer->i->N ; which_db++) { for (int which_db = 0; which_db < indexer->i->N ; which_db++) {
//db = indexer->i->dest_dbs[which_db];
//brt = db_struct_i(db)->ft_handle;
toku_txn_require_checkpoint_on_commit(tokutxn); toku_txn_require_checkpoint_on_commit(tokutxn);
} }
...@@ -449,6 +570,19 @@ toku_indexer_set_test_only_flags(DB_INDEXER *indexer, int flags) { ...@@ -449,6 +570,19 @@ toku_indexer_set_test_only_flags(DB_INDEXER *indexer, int flags) {
indexer->i->test_only_flags = flags; indexer->i->test_only_flags = flags;
} }
// this allows us to call the undo do function in tests using
// a convenience wrapper that gets and destroys the ule's prov info
static int
test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
struct ule_prov_info prov_info;
memset(&prov_info, 0, sizeof(prov_info));
ule_prov_info_init(&prov_info, ule);
indexer_fill_prov_info(indexer, &prov_info);
int r = indexer_undo_do(indexer, hotdb, ule, &prov_info);
ule_prov_info_destroy(&prov_info);
return r;
}
DB * DB *
toku_indexer_get_src_db(DB_INDEXER *indexer) { toku_indexer_get_src_db(DB_INDEXER *indexer) {
return indexer->i->src_db; return indexer->i->src_db;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment