Commit 64d9c582 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

closes #5197, check in fix

git-svn-id: file:///svn/toku/tokudb@45347 c7de825b-a66e-492c-adef-691d508d4ae1
parent da30a643
...@@ -145,6 +145,7 @@ struct ft_loader_s { ...@@ -145,6 +145,7 @@ struct ft_loader_s {
const char *temp_file_template; const char *temp_file_template;
CACHETABLE cachetable; CACHETABLE cachetable;
BOOL did_reserve_memory;
uint64_t reserved_memory; // how much memory are we allowed to use? uint64_t reserved_memory; // how much memory are we allowed to use?
/* To make it easier to recover from errors, we don't use FILE*, instead we use an index into the file_infos. */ /* To make it easier to recover from errors, we don't use FILE*, instead we use an index into the file_infos. */
...@@ -241,7 +242,8 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, ...@@ -241,7 +242,8 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
ft_compare_func bt_compare_functions[/*N*/], ft_compare_func bt_compare_functions[/*N*/],
const char *temp_file_template, const char *temp_file_template,
LSN load_lsn, LSN load_lsn,
TOKUTXN txn); TOKUTXN txn,
BOOL reserve_memory);
void toku_ft_loader_internal_destroy (FTLOADER bl, BOOL is_error); void toku_ft_loader_internal_destroy (FTLOADER bl, BOOL is_error);
......
...@@ -374,7 +374,8 @@ void toku_ft_loader_internal_destroy (FTLOADER bl, BOOL is_error) { ...@@ -374,7 +374,8 @@ void toku_ft_loader_internal_destroy (FTLOADER bl, BOOL is_error) {
toku_free(bl->fractal_queues); toku_free(bl->fractal_queues);
toku_free(bl->fractal_threads_live); toku_free(bl->fractal_threads_live);
if (bl->cachetable) if (bl->did_reserve_memory)
invariant(bl->cachetable);
toku_cachetable_release_reserved_memory(bl->cachetable, bl->reserved_memory); toku_cachetable_release_reserved_memory(bl->cachetable, bl->reserved_memory);
ft_loader_destroy_error_callback(&bl->error_callback); ft_loader_destroy_error_callback(&bl->error_callback);
...@@ -482,7 +483,8 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, ...@@ -482,7 +483,8 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
ft_compare_func bt_compare_functions[/*N*/], ft_compare_func bt_compare_functions[/*N*/],
const char *temp_file_template, const char *temp_file_template,
LSN load_lsn, LSN load_lsn,
TOKUTXN txn) TOKUTXN txn,
BOOL reserve_memory)
// Effect: Allocate and initialize a FTLOADER, but do not create the extractor thread. // Effect: Allocate and initialize a FTLOADER, but do not create the extractor thread.
{ {
FTLOADER CALLOC(bl); // initialized to all zeros (hence CALLOC) FTLOADER CALLOC(bl); // initialized to all zeros (hence CALLOC)
...@@ -490,10 +492,14 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, ...@@ -490,10 +492,14 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
bl->generate_row_for_put = g; bl->generate_row_for_put = g;
bl->cachetable = cachetable; bl->cachetable = cachetable;
if (bl->cachetable) if (reserve_memory && bl->cachetable) {
bl->did_reserve_memory = TRUE;
bl->reserved_memory = toku_cachetable_reserve_memory(bl->cachetable, 2.0/3.0); // allocate 2/3 of the unreserved part (which is 3/4 of the memory to start with). bl->reserved_memory = toku_cachetable_reserve_memory(bl->cachetable, 2.0/3.0); // allocate 2/3 of the unreserved part (which is 3/4 of the memory to start with).
else }
else {
bl->did_reserve_memory = FALSE;
bl->reserved_memory = 512*1024*1024; // if no cache table use 512MB. bl->reserved_memory = 512*1024*1024; // if no cache table use 512MB.
}
//printf("Reserved memory=%ld\n", bl->reserved_memory); //printf("Reserved memory=%ld\n", bl->reserved_memory);
bl->src_db = src_db; bl->src_db = src_db;
...@@ -579,7 +585,8 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp, ...@@ -579,7 +585,8 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp,
ft_compare_func bt_compare_functions[/*N*/], ft_compare_func bt_compare_functions[/*N*/],
const char *temp_file_template, const char *temp_file_template,
LSN load_lsn, LSN load_lsn,
TOKUTXN txn) TOKUTXN txn,
BOOL reserve_memory)
/* Effect: called by DB_ENV->create_loader to create a brt loader. /* Effect: called by DB_ENV->create_loader to create a brt loader.
* Arguments: * Arguments:
* blp Return the brt loader here. * blp Return the brt loader here.
...@@ -600,7 +607,8 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp, ...@@ -600,7 +607,8 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp,
bt_compare_functions, bt_compare_functions,
temp_file_template, temp_file_template,
load_lsn, load_lsn,
txn); txn,
reserve_memory);
if (r!=0) result = r; if (r!=0) result = r;
} }
if (result==0) { if (result==0) {
......
...@@ -25,7 +25,8 @@ int toku_ft_loader_open (FTLOADER *bl, ...@@ -25,7 +25,8 @@ int toku_ft_loader_open (FTLOADER *bl,
ft_compare_func bt_compare_functions[/*N*/], ft_compare_func bt_compare_functions[/*N*/],
const char *temp_file_template, const char *temp_file_template,
LSN load_lsn, LSN load_lsn,
TOKUTXN txn); TOKUTXN txn,
BOOL reserve_memory);
int toku_ft_loader_put (FTLOADER bl, DBT *key, DBT *val); int toku_ft_loader_put (FTLOADER bl, DBT *key, DBT *val);
......
...@@ -88,7 +88,7 @@ static void test_extractor(int nrows, int nrowsets, BOOL expect_fail) { ...@@ -88,7 +88,7 @@ static void test_extractor(int nrows, int nrowsets, BOOL expect_fail) {
} }
FTLOADER loader; FTLOADER loader;
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE); r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, TRUE);
assert(r == 0); assert(r == 0);
struct rowset *rowset[nrowsets]; struct rowset *rowset[nrowsets];
......
...@@ -103,7 +103,7 @@ static void test_extractor(int nrows, int nrowsets, BOOL expect_fail, const char ...@@ -103,7 +103,7 @@ static void test_extractor(int nrows, int nrowsets, BOOL expect_fail, const char
sprintf(temp, "%s/%s", testdir, "tempXXXXXX"); sprintf(temp, "%s/%s", testdir, "tempXXXXXX");
FTLOADER loader; FTLOADER loader;
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE); r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, TRUE);
assert(r == 0); assert(r == 0);
struct rowset *rowset[nrowsets]; struct rowset *rowset[nrowsets];
......
...@@ -323,7 +323,7 @@ static void test_extractor(int nrows, int nrowsets, const char *testdir) { ...@@ -323,7 +323,7 @@ static void test_extractor(int nrows, int nrowsets, const char *testdir) {
sprintf(temp, "%s/%s", testdir, "tempXXXXXX"); sprintf(temp, "%s/%s", testdir, "tempXXXXXX");
FTLOADER loader; FTLOADER loader;
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, temp, ZERO_LSN, TXNID_NONE); r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, temp, ZERO_LSN, TXNID_NONE, TRUE);
assert(r == 0); assert(r == 0);
struct rowset *rowset[nrowsets]; struct rowset *rowset[nrowsets];
......
...@@ -327,7 +327,7 @@ static void test (const char *directory, BOOL is_error) { ...@@ -327,7 +327,7 @@ static void test (const char *directory, BOOL is_error) {
bt_compare_functions, bt_compare_functions,
"tempxxxxxx", "tempxxxxxx",
*lsnp, *lsnp,
TXNID_NONE); TXNID_NONE, TRUE);
assert(r==0); assert(r==0);
} }
......
...@@ -60,7 +60,7 @@ static void test_loader_open(int ndbs) { ...@@ -60,7 +60,7 @@ static void test_loader_open(int ndbs) {
for (i = 0; ; i++) { for (i = 0; ; i++) {
set_my_malloc_trigger(i+1); set_my_malloc_trigger(i+1);
r = toku_ft_loader_open(&loader, NULL, NULL, NULL, ndbs, brts, dbs, fnames, compares, "", ZERO_LSN, TXNID_NONE); r = toku_ft_loader_open(&loader, NULL, NULL, NULL, ndbs, brts, dbs, fnames, compares, "", ZERO_LSN, TXNID_NONE, TRUE);
if (r == 0) if (r == 0)
break; break;
} }
......
...@@ -202,7 +202,7 @@ toku_indexer_create_indexer(DB_ENV *env, ...@@ -202,7 +202,7 @@ toku_indexer_create_indexer(DB_ENV *env,
// //
for (int i = 0; i < N; i++) { for (int i = 0; i < N; i++) {
DB_LOADER* loader = NULL; DB_LOADER* loader = NULL;
int r = env->create_loader(env, txn, &loader, dest_dbs[i], 1, &dest_dbs[i], NULL, NULL, DB_PRELOCKED_WRITE); int r = env->create_loader(env, txn, &loader, dest_dbs[i], 1, &dest_dbs[i], NULL, NULL, DB_PRELOCKED_WRITE | LOADER_USE_PUTS);
if (r) { if (r) {
goto create_exit; goto create_exit;
} }
...@@ -485,7 +485,7 @@ close_indexer(DB_INDEXER *indexer) { ...@@ -485,7 +485,7 @@ close_indexer(DB_INDEXER *indexer) {
(void) __sync_fetch_and_sub(&STATUS_VALUE(INDEXER_CURRENT), 1); (void) __sync_fetch_and_sub(&STATUS_VALUE(INDEXER_CURRENT), 1);
// Mark txn as needing a checkpoint. // Mark txn as needing a checkpoint.
// (This will cause a local checkpoint of created index files, which is necessary // (This will cause a checkpoint, which is necessary
// because these files are not necessarily on disk and all the operations // because these files are not necessarily on disk and all the operations
// to create them are not in the recovery log.) // to create them are not in the recovery log.)
DB_TXN *txn = indexer->i->txn; DB_TXN *txn = indexer->i->txn;
......
...@@ -194,6 +194,7 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -194,6 +194,7 @@ int toku_loader_create_loader(DB_ENV *env,
*blp = NULL; // set later when created *blp = NULL; // set later when created
DB_LOADER *loader = NULL; DB_LOADER *loader = NULL;
BOOL use_puts = loader_flags&LOADER_USE_PUTS;
XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func) XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func)
XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL) XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL)
...@@ -272,7 +273,8 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -272,7 +273,8 @@ int toku_loader_create_loader(DB_ENV *env,
compare_functions, compare_functions,
loader->i->temp_file_template, loader->i->temp_file_template,
load_lsn, load_lsn,
ttxn); ttxn,
use_puts);
if ( r!=0 ) { if ( r!=0 ) {
toku_free(new_inames_in_env); toku_free(new_inames_in_env);
toku_free(brts); toku_free(brts);
...@@ -282,7 +284,7 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -282,7 +284,7 @@ int toku_loader_create_loader(DB_ENV *env,
loader->i->inames_in_env = new_inames_in_env; loader->i->inames_in_env = new_inames_in_env;
toku_free(brts); toku_free(brts);
if (loader->i->loader_flags & LOADER_USE_PUTS) { if (use_puts) {
XCALLOC_N(loader->i->N, loader->i->ekeys); XCALLOC_N(loader->i->N, loader->i->ekeys);
XCALLOC_N(loader->i->N, loader->i->evals); XCALLOC_N(loader->i->N, loader->i->evals);
// the following function grabs the ydb lock, so we // the following function grabs the ydb lock, so we
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment