/* -*- mode: C; c-basic-offset: 4 -*- */ /* * Copyright (c) 2010 Tokutek Inc. All rights reserved." * The technology is licensed by the Massachusetts Institute of Technology, * Rutgers State University of New Jersey, and the Research Foundation of * State University of New York at Stony Brook under United States of America * Serial No. 11/760379 and to the patents and/or patent applications resulting from it." */ /* * The indexer */ #include <stdio.h> #include <string.h> #include <toku_portability.h> #include "toku_assert.h" #include "ydb-internal.h" #include "le-cursor.h" #include "indexer.h" #include "brt-internal.h" #include "toku_atomic.h" #include "tokuconst.h" #include "brt.h" #include "mempool.h" #include "leafentry.h" #include "ule.h" #include "xids.h" #include "log-internal.h" // for now static INDEXER_STATUS_S status; #include "indexer-internal.h" static int build_index(DB_INDEXER *indexer); static int close_indexer(DB_INDEXER *indexer); static int abort_indexer(DB_INDEXER *indexer); static void free_indexer_resources(DB_INDEXER *indexer); static void free_indexer(DB_INDEXER *indexer); static int update_estimated_rows(DB_INDEXER *indexer); static int maybe_call_poll_func(DB_INDEXER *indexer, uint64_t loop_count); static int associate_indexer_with_hot_dbs(DB_INDEXER *indexer, DB *dest_dbs[], int N) { int result =0; for (int i = 0; i < N; i++) { result = toku_db_set_indexer(dest_dbs[i], indexer); if (result != 0) { for (int j = 0; j < i; j++) { int result2 = toku_db_set_indexer(dest_dbs[j], NULL); lazy_assert(result2 == 0); } } } return result; } static void disassociate_indexer_from_hot_dbs(DB_INDEXER *indexer) { for (int i = 0; i < indexer->i->N; i++) { int result = toku_db_set_indexer(indexer->i->dest_dbs[i], NULL); lazy_assert(result == 0); } } static void indexer_add_refs(DB_INDEXER *indexer) { toku_db_add_ref(indexer->i->src_db); for (int i = 0; i < indexer->i->N; i++) toku_db_add_ref(indexer->i->dest_dbs[i]); } static void indexer_release_refs(DB_INDEXER *indexer) { toku_db_release_ref(indexer->i->src_db); for (int i = 0; i < indexer->i->N; i++) toku_db_release_ref(indexer->i->dest_dbs[i]); } /* * free_indexer_resources() frees all of the resources associated with * struct __toku_indexer_internal * assumes any previously freed items set the field pointer to NULL */ static void free_indexer_resources(DB_INDEXER *indexer) { if ( indexer->i ) { if ( indexer->i->lec ) { le_cursor_close(indexer->i->lec); } if ( indexer->i->fnums ) { toku_free(indexer->i->fnums); indexer->i->fnums = NULL; } indexer_release_refs(indexer); // indexer->i toku_free(indexer->i); indexer->i = NULL; } } static void free_indexer(DB_INDEXER *indexer) { if ( indexer ) { free_indexer_resources(indexer); toku_free(indexer); indexer = NULL; } } int toku_indexer_create_indexer(DB_ENV *env, DB_TXN *txn, DB_INDEXER **indexerp, DB *src_db, int N, DB *dest_dbs[N], uint32_t db_flags[N], uint32_t indexer_flags) { int rval = 0; DB_INDEXER *indexer = 0; // set later when created *indexerp = NULL; rval = toku_grab_read_lock_on_directory (src_db, txn); if (rval == 0) { XCALLOC(indexer); // init to all zeroes (thus initializing the error_callback and poll_func) if ( !indexer ) { rval = ENOMEM; goto create_exit; } XCALLOC(indexer->i); // init to all zeroes (thus initializing all pointers to NULL) if ( !indexer->i ) { rval = ENOMEM; goto create_exit; } indexer->i->env = env; indexer->i->txn = txn; indexer->i->src_db = src_db; indexer->i->N = N; indexer->i->dest_dbs = dest_dbs; indexer->i->db_flags = db_flags; indexer->i->indexer_flags = indexer_flags; indexer->i->loop_mod = 1000; // call poll_func every 1000 rows indexer->i->estimated_rows = 0; indexer->i->undo_do = indexer_undo_do; // TEST export the undo do function XCALLOC_N(N, indexer->i->fnums); if ( !indexer->i->fnums ) { rval = ENOMEM; goto create_exit; } for(int i=0;i<indexer->i->N;i++) { indexer->i->fnums[i] = toku_cachefile_filenum(db_struct_i(dest_dbs[i])->brt->cf); } indexer->i->filenums.num = N; indexer->i->filenums.filenums = indexer->i->fnums; indexer->i->test_only_flags = 0; // for test use only indexer->set_error_callback = toku_indexer_set_error_callback; indexer->set_poll_function = toku_indexer_set_poll_function; indexer->build = build_index; indexer->close = close_indexer; indexer->abort = abort_indexer; // create and initialize the leafentry cursor rval = le_cursor_create(&indexer->i->lec, db_struct_i(src_db)->brt, db_txn_struct_i(txn)->tokutxn); if ( !indexer->i->lec ) { goto create_exit; } // 2954: add recovery and rollback entries LSN hot_index_lsn; // not used (yet) TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn; FILENUMS filenums = indexer->i->filenums; rval = toku_brt_hot_index(NULL, ttxn, filenums, 1, &hot_index_lsn); } if (rval == 0) rval = associate_indexer_with_hot_dbs(indexer, dest_dbs, N); create_exit: if ( rval == 0 ) { indexer_add_refs(indexer); *indexerp = indexer; (void) toku_sync_fetch_and_increment_uint64(&status.create); (void) toku_sync_fetch_and_increment_uint32(&status.current); if ( status.current > status.max ) status.max = status.current; // not worth a lock to make threadsafe, may be inaccurate } else { (void) toku_sync_fetch_and_increment_uint64(&status.create_fail); free_indexer(indexer); } return rval; } int toku_indexer_set_poll_function(DB_INDEXER *indexer, int (*poll_func)(void *poll_extra, float progress), void *poll_extra) { invariant(indexer != NULL); indexer->i->poll_func = poll_func; indexer->i->poll_extra = poll_extra; return 0; } int toku_indexer_set_error_callback(DB_INDEXER *indexer, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra) { invariant(indexer != NULL); indexer->i->error_callback = error_cb; indexer->i->error_extra = error_extra; return 0; } int toku_indexer_is_key_right_of_le_cursor(DB_INDEXER *indexer, DB *db, const DBT *key) { return is_key_right_of_le_cursor(indexer->i->lec, key, db); } static int build_index(DB_INDEXER *indexer) { int result = 0; DBT key; toku_init_dbt(&key); key.flags = DB_DBT_REALLOC; DBT le; toku_init_dbt(&le); le.flags = DB_DBT_REALLOC; BOOL done = FALSE; for (uint64_t loop_count = 0; !done; loop_count++) { toku_ydb_lock(); result = le_cursor_next(indexer->i->lec, &le); if (result != 0) { done = TRUE; if (result == DB_NOTFOUND) result = 0; // all done, normal way to exit loop successfully } else { ULEHANDLE ule = toku_ule_create(le.data); for (int which_db = 0; (which_db < indexer->i->N) && (result == 0); which_db++) { DB *db = indexer->i->dest_dbs[which_db]; result = indexer_undo_do(indexer, db, ule); if ( (result != 0) && (indexer->i->error_callback != NULL)) { toku_dbt_set(ule_get_keylen(ule), ule_get_key(ule), &key, NULL); indexer->i->error_callback(db, which_db, result, &key, NULL, indexer->i->error_extra); } } toku_ule_free(ule); } toku_ydb_unlock_and_yield(1000); // if there is lock contention, then sleep for 1 millisecond after the unlock if (result == 0) result = maybe_call_poll_func(indexer, loop_count); if (result != 0) done = TRUE; } toku_destroy_dbt(&key); toku_destroy_dbt(&le); // post index creation cleanup // - optimize? // - garbage collect? // - unique checks? if ( result == 0 ) { (void) toku_sync_fetch_and_increment_uint64(&status.build); } else { (void) toku_sync_fetch_and_increment_uint64(&status.build_fail); } return result; } static void require_local_checkpoint (BRT brt, TOKUTXN txn) { toku_brtheader_lock(brt->h); toku_list_push(&txn->checkpoint_before_commit, &brt->h->checkpoint_before_commit_link); toku_brtheader_unlock(brt->h); } static int close_indexer(DB_INDEXER *indexer) { int r = 0; (void) toku_sync_fetch_and_decrement_uint32(&status.current); toku_ydb_lock(); { // Add all created dbs to the transaction's checkpoint_before_commit list. // (This will cause a local checkpoint of created index files, which is necessary // because these files are not necessarily on disk and all the operations // to create them are not in the recovery log.) DB_TXN *txn = indexer->i->txn; TOKUTXN tokutxn = db_txn_struct_i(txn)->tokutxn; BRT brt; DB *db; for (int which_db = 0; which_db < indexer->i->N ; which_db++) { db = indexer->i->dest_dbs[which_db]; brt = db_struct_i(db)->brt; require_local_checkpoint(brt, tokutxn); } // Disassociate the indexer from the hot dbs disassociate_indexer_from_hot_dbs(indexer); } toku_ydb_unlock(); free_indexer(indexer); if ( r == 0 ) { (void) toku_sync_fetch_and_increment_uint64(&status.close); } else { (void) toku_sync_fetch_and_increment_uint64(&status.close_fail); } return r; } static int abort_indexer(DB_INDEXER *indexer) { (void) toku_sync_fetch_and_decrement_uint32(&status.current); (void) toku_sync_fetch_and_increment_uint64(&status.abort); free_indexer(indexer); return 0; } // derived from ha_tokudb::estimate_num_rows static int update_estimated_rows(DB_INDEXER *indexer) { DBT key; toku_init_dbt(&key); DBT data; toku_init_dbt(&data); DBC* crsr; DB_TXN* txn = NULL; uint64_t less, equal, greater; int is_exact; int error; DB *db = indexer->i->src_db; DB_ENV *db_env = indexer->i->env; error = db_env->txn_begin(db_env, 0, &txn, DB_READ_UNCOMMITTED); if (error) goto cleanup; error = db->cursor(db, txn, &crsr, 0); if (error) { goto cleanup; } error = crsr->c_get(crsr, &key, &data, DB_FIRST); if (error == DB_NOTFOUND) { indexer->i->estimated_rows = 0; error = 0; goto cleanup; } else if (error) { goto cleanup; } error = db->key_range64(db, txn, &key, &less, &equal, &greater, &is_exact); if (error) { goto cleanup; } indexer->i->estimated_rows = equal + greater; error = 0; cleanup: if ( crsr != NULL ) { int rr = crsr->c_close(crsr); invariant(rr == 0); crsr = NULL; } txn->commit(txn, 0); return error; } static int maybe_call_poll_func(DB_INDEXER *indexer, uint64_t loop_count) { int result = 0; if ( indexer->i->poll_func != NULL && ( loop_count % indexer->i->loop_mod ) == 0 ) { int r __attribute__((unused)) = update_estimated_rows(indexer); // what happens if estimate_rows fails? // - currently does not modify estimate, which is probably sufficient float progress; if ( indexer->i->estimated_rows == 0 ) { progress = 1.0; } else { progress = (float)loop_count / (float)indexer->i->estimated_rows; } result = indexer->i->poll_func(indexer->i->poll_extra, progress); } return result; } void toku_indexer_get_status(INDEXER_STATUS s) { *s = status; } // this allows us to force errors under test. Flags are defined in indexer.h void toku_indexer_set_test_only_flags(DB_INDEXER *indexer, int flags) { invariant(indexer != NULL); indexer->i->test_only_flags = flags; } DB * toku_indexer_get_src_db(DB_INDEXER *indexer) { return indexer->i->src_db; }