Commit 8793bdfa authored by rich prohaska's avatar rich prohaska

tokutek/ft-index#80 tokutek/ft-engine#94 impose an upper bound on loader memory reservations

parent d38e56cb
...@@ -459,6 +459,8 @@ static void print_db_env_struct (void) { ...@@ -459,6 +459,8 @@ static void print_db_env_struct (void) {
"void (*change_fsync_log_period) (DB_ENV*, uint32_t)", "void (*change_fsync_log_period) (DB_ENV*, uint32_t)",
"int (*iterate_live_transactions) (DB_ENV *env, iterate_transactions_callback callback, void *extra)", "int (*iterate_live_transactions) (DB_ENV *env, iterate_transactions_callback callback, void *extra)",
"int (*iterate_pending_lock_requests) (DB_ENV *env, iterate_requests_callback callback, void *extra)", "int (*iterate_pending_lock_requests) (DB_ENV *env, iterate_requests_callback callback, void *extra)",
"void (*set_loader_memory_size)(DB_ENV *env, uint64_t loader_memory_size)",
"uint64_t (*get_loader_memory_size)(DB_ENV *env)",
NULL}; NULL};
sort_and_dump_fields("db_env", true, extra); sort_and_dump_fields("db_env", true, extra);
......
...@@ -519,7 +519,7 @@ public: ...@@ -519,7 +519,7 @@ public:
void change_pair_attr(PAIR_ATTR old_attr, PAIR_ATTR new_attr); void change_pair_attr(PAIR_ATTR old_attr, PAIR_ATTR new_attr);
void add_to_size_current(long size); void add_to_size_current(long size);
void remove_from_size_current(long size); void remove_from_size_current(long size);
uint64_t reserve_memory(double fraction); uint64_t reserve_memory(double fraction, uint64_t upper_bound);
void release_reserved_memory(uint64_t reserved_memory); void release_reserved_memory(uint64_t reserved_memory);
void run_eviction_thread(); void run_eviction_thread();
void do_partial_eviction(PAIR p, bool pair_mutex_held); void do_partial_eviction(PAIR p, bool pair_mutex_held);
......
...@@ -319,9 +319,8 @@ CHECKPOINTER toku_cachetable_get_checkpointer(CACHETABLE ct) { ...@@ -319,9 +319,8 @@ CHECKPOINTER toku_cachetable_get_checkpointer(CACHETABLE ct) {
return &ct->cp; return &ct->cp;
} }
uint64_t toku_cachetable_reserve_memory(CACHETABLE ct, double fraction) { uint64_t toku_cachetable_reserve_memory(CACHETABLE ct, double fraction, uint64_t upper_bound) {
uint64_t reserved_memory = 0; uint64_t reserved_memory = ct->ev.reserve_memory(fraction, upper_bound);
reserved_memory = ct->ev.reserve_memory(fraction);
return reserved_memory; return reserved_memory;
} }
...@@ -3818,10 +3817,15 @@ void evictor::remove_from_size_current(long size) { ...@@ -3818,10 +3817,15 @@ void evictor::remove_from_size_current(long size) {
// //
// TODO: (Zardosht) comment this function // TODO: (Zardosht) comment this function
// //
uint64_t evictor::reserve_memory(double fraction) { uint64_t evictor::reserve_memory(double fraction, uint64_t upper_bound) {
uint64_t reserved_memory = 0;
toku_mutex_lock(&m_ev_thread_lock); toku_mutex_lock(&m_ev_thread_lock);
reserved_memory = fraction * (m_low_size_watermark - m_size_reserved); uint64_t reserved_memory = fraction * (m_low_size_watermark - m_size_reserved);
if (0) { // debug
fprintf(stderr, "%s %" PRIu64 " %" PRIu64 "\n", __PRETTY_FUNCTION__, reserved_memory, upper_bound);
}
if (upper_bound > 0 && reserved_memory > upper_bound) {
reserved_memory = upper_bound;
}
m_size_reserved += reserved_memory; m_size_reserved += reserved_memory;
(void) toku_sync_fetch_and_add(&m_size_current, reserved_memory); (void) toku_sync_fetch_and_add(&m_size_current, reserved_memory);
this->signal_eviction_thread(); this->signal_eviction_thread();
......
...@@ -177,7 +177,7 @@ FILENUM toku_cachetable_reserve_filenum(CACHETABLE ct); ...@@ -177,7 +177,7 @@ FILENUM toku_cachetable_reserve_filenum(CACHETABLE ct);
// Returns the amount reserved. // Returns the amount reserved.
// To return the memory to the cachetable, call toku_cachetable_release_reserved_memory // To return the memory to the cachetable, call toku_cachetable_release_reserved_memory
// Requires 0<fraction<1. // Requires 0<fraction<1.
uint64_t toku_cachetable_reserve_memory(CACHETABLE, double fraction); uint64_t toku_cachetable_reserve_memory(CACHETABLE, double fraction, uint64_t upper_bound);
void toku_cachetable_release_reserved_memory(CACHETABLE, uint64_t); void toku_cachetable_release_reserved_memory(CACHETABLE, uint64_t);
// cachefile operations // cachefile operations
......
...@@ -343,6 +343,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, ...@@ -343,6 +343,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
LSN load_lsn, LSN load_lsn,
TOKUTXN txn, TOKUTXN txn,
bool reserve_memory, bool reserve_memory,
uint64_t reserve_memory_size,
bool compress_intermediates); bool compress_intermediates);
void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error); void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error);
......
...@@ -542,6 +542,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, ...@@ -542,6 +542,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
LSN load_lsn, LSN load_lsn,
TOKUTXN txn, TOKUTXN txn,
bool reserve_memory, bool reserve_memory,
uint64_t reserve_memory_size,
bool compress_intermediates) bool compress_intermediates)
// Effect: Allocate and initialize a FTLOADER, but do not create the extractor thread. // Effect: Allocate and initialize a FTLOADER, but do not create the extractor thread.
{ {
...@@ -552,14 +553,16 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, ...@@ -552,14 +553,16 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
bl->cachetable = cachetable; bl->cachetable = cachetable;
if (reserve_memory && bl->cachetable) { if (reserve_memory && bl->cachetable) {
bl->did_reserve_memory = true; bl->did_reserve_memory = true;
bl->reserved_memory = toku_cachetable_reserve_memory(bl->cachetable, 2.0/3.0); // allocate 2/3 of the unreserved part (which is 3/4 of the memory to start with). bl->reserved_memory = toku_cachetable_reserve_memory(bl->cachetable, 2.0/3.0, reserve_memory_size); // allocate 2/3 of the unreserved part (which is 3/4 of the memory to start with).
} }
else { else {
bl->did_reserve_memory = false; bl->did_reserve_memory = false;
bl->reserved_memory = 512*1024*1024; // if no cache table use 512MB. bl->reserved_memory = 512*1024*1024; // if no cache table use 512MB.
} }
bl->compress_intermediates = compress_intermediates; bl->compress_intermediates = compress_intermediates;
//printf("Reserved memory=%ld\n", bl->reserved_memory); if (0) { // debug
fprintf(stderr, "%s Reserved memory=%ld\n", __FUNCTION__, bl->reserved_memory);
}
bl->src_db = src_db; bl->src_db = src_db;
bl->N = N; bl->N = N;
...@@ -646,6 +649,7 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp, ...@@ -646,6 +649,7 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp,
LSN load_lsn, LSN load_lsn,
TOKUTXN txn, TOKUTXN txn,
bool reserve_memory, bool reserve_memory,
uint64_t reserve_memory_size,
bool compress_intermediates) bool compress_intermediates)
/* Effect: called by DB_ENV->create_loader to create a brt loader. /* Effect: called by DB_ENV->create_loader to create a brt loader.
* Arguments: * Arguments:
...@@ -669,6 +673,7 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp, ...@@ -669,6 +673,7 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp,
load_lsn, load_lsn,
txn, txn,
reserve_memory, reserve_memory,
reserve_memory_size,
compress_intermediates); compress_intermediates);
if (r!=0) result = r; if (r!=0) result = r;
} }
......
...@@ -112,6 +112,7 @@ int toku_ft_loader_open (FTLOADER *bl, ...@@ -112,6 +112,7 @@ int toku_ft_loader_open (FTLOADER *bl,
LSN load_lsn, LSN load_lsn,
TOKUTXN txn, TOKUTXN txn,
bool reserve_memory, bool reserve_memory,
uint64_t reserve_memory_size,
bool compress_intermediates); bool compress_intermediates);
int toku_ft_loader_put (FTLOADER bl, DBT *key, DBT *val); int toku_ft_loader_put (FTLOADER bl, DBT *key, DBT *val);
......
...@@ -237,7 +237,7 @@ void evictor_unit_test::verify_ev_m_size_reserved() { ...@@ -237,7 +237,7 @@ void evictor_unit_test::verify_ev_m_size_reserved() {
this->verify_ev_init(limit); this->verify_ev_init(limit);
assert(m_ev.m_size_reserved == expected_m_size_reserved); assert(m_ev.m_size_reserved == expected_m_size_reserved);
m_ev.m_num_eviction_thread_runs = 0; m_ev.m_num_eviction_thread_runs = 0;
m_ev.reserve_memory(0.5); m_ev.reserve_memory(0.5, 0);
assert(m_ev.m_size_reserved == 100+150); //100 original, 150 from last call assert(m_ev.m_size_reserved == 100+150); //100 original, 150 from last call
assert(m_ev.m_size_current == 150); assert(m_ev.m_size_current == 150);
assert(m_ev.m_size_evicting == 0); assert(m_ev.m_size_evicting == 0);
......
...@@ -170,7 +170,7 @@ static void test_extractor(int nrows, int nrowsets, bool expect_fail) { ...@@ -170,7 +170,7 @@ static void test_extractor(int nrows, int nrowsets, bool expect_fail) {
} }
FTLOADER loader; FTLOADER loader;
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, true, false); r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, true, 0, false);
assert(r == 0); assert(r == 0);
struct rowset *rowset[nrowsets]; struct rowset *rowset[nrowsets];
......
...@@ -180,7 +180,7 @@ static void test_extractor(int nrows, int nrowsets, bool expect_fail, const char ...@@ -180,7 +180,7 @@ static void test_extractor(int nrows, int nrowsets, bool expect_fail, const char
sprintf(temp, "%s/%s", testdir, "tempXXXXXX"); sprintf(temp, "%s/%s", testdir, "tempXXXXXX");
FTLOADER loader; FTLOADER loader;
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, true, false); r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, true, 0, false);
assert(r == 0); assert(r == 0);
struct rowset *rowset[nrowsets]; struct rowset *rowset[nrowsets];
......
...@@ -402,7 +402,7 @@ static void test_extractor(int nrows, int nrowsets, const char *testdir) { ...@@ -402,7 +402,7 @@ static void test_extractor(int nrows, int nrowsets, const char *testdir) {
sprintf(temp, "%s/%s", testdir, "tempXXXXXX"); sprintf(temp, "%s/%s", testdir, "tempXXXXXX");
FTLOADER loader; FTLOADER loader;
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, temp, ZERO_LSN, TXNID_NONE, true, false); r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, temp, ZERO_LSN, TXNID_NONE, true, 0, false);
assert(r == 0); assert(r == 0);
struct rowset *rowset[nrowsets]; struct rowset *rowset[nrowsets];
......
...@@ -412,7 +412,7 @@ static void test (const char *directory, bool is_error) { ...@@ -412,7 +412,7 @@ static void test (const char *directory, bool is_error) {
bt_compare_functions, bt_compare_functions,
"tempxxxxxx", "tempxxxxxx",
*lsnp, *lsnp,
TXNID_NONE, true, false); TXNID_NONE, true, 0, false);
assert(r==0); assert(r==0);
} }
......
...@@ -143,7 +143,7 @@ static void test_loader_open(int ndbs) { ...@@ -143,7 +143,7 @@ static void test_loader_open(int ndbs) {
for (i = 0; ; i++) { for (i = 0; ; i++) {
set_my_malloc_trigger(i+1); set_my_malloc_trigger(i+1);
r = toku_ft_loader_open(&loader, NULL, NULL, NULL, ndbs, brts, dbs, fnames, compares, "", ZERO_LSN, TXNID_NONE, true, false); r = toku_ft_loader_open(&loader, NULL, NULL, NULL, ndbs, brts, dbs, fnames, compares, "", ZERO_LSN, TXNID_NONE, true, 0, false);
if (r == 0) if (r == 0)
break; break;
} }
......
...@@ -100,11 +100,11 @@ static void test_cachetable_reservation (long size) { ...@@ -100,11 +100,11 @@ static void test_cachetable_reservation (long size) {
toku_cachetable_create(&ct, size, ZERO_LSN, NULL); toku_cachetable_create(&ct, size, ZERO_LSN, NULL);
} }
{ {
uint64_t r0 = toku_cachetable_reserve_memory(ct, 0.5); uint64_t r0 = toku_cachetable_reserve_memory(ct, 0.5, 0);
uint64_t r0_bound = size/2 + size/16; uint64_t r0_bound = size/2 + size/16;
uint64_t r1 = toku_cachetable_reserve_memory(ct, 0.5); uint64_t r1 = toku_cachetable_reserve_memory(ct, 0.5, 0);
uint64_t r1_bound = r0_bound/2; uint64_t r1_bound = r0_bound/2;
uint64_t r2 = toku_cachetable_reserve_memory(ct, 0.5); uint64_t r2 = toku_cachetable_reserve_memory(ct, 0.5, 0);
uint64_t r2_bound = r1_bound/2; uint64_t r2_bound = r1_bound/2;
if (verbose) printf("%10ld: r0=%10" PRIu64 " r1=%10" PRIu64 " r2=%10" PRIu64 "\n", size, r0, r1, r2); if (verbose) printf("%10ld: r0=%10" PRIu64 " r1=%10" PRIu64 " r2=%10" PRIu64 "\n", size, r0, r1, r2);
assert(r0 <= r0_bound); assert(r0 <= r0_bound);
......
...@@ -330,6 +330,7 @@ toku_loader_create_loader(DB_ENV *env, ...@@ -330,6 +330,7 @@ toku_loader_create_loader(DB_ENV *env,
load_lsn, load_lsn,
ttxn, ttxn,
puts_allowed, puts_allowed,
loader->i->env->i->loader_memory_size,
compress_intermediates); compress_intermediates);
if ( rval!=0 ) { if ( rval!=0 ) {
toku_free(new_inames_in_env); toku_free(new_inames_in_env);
......
...@@ -66,6 +66,7 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) ...@@ -66,6 +66,7 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
directory_lock directory_lock
diskfull diskfull
dump-env dump-env
env_loader_memory
env-put-multiple env-put-multiple
env_startup env_startup
execute-updates execute-updates
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
/*
COPYING CONDITIONS NOTICE:
This program is free software; you can redistribute it and/or modify
it under the terms of version 2 of the GNU General Public License as
published by the Free Software Foundation, and provided that the
following conditions are met:
* Redistributions of source code must retain this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below).
* Redistributions in binary form must reproduce this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below) in the documentation and/or other materials
provided with the distribution.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
COPYRIGHT NOTICE:
TokuDB, Tokutek Fractal Tree Indexing Library.
Copyright (C) 2007-2013 Tokutek, Inc.
DISCLAIMER:
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
UNIVERSITY PATENT NOTICE:
The technology is licensed by the Massachusetts Institute of
Technology, Rutgers State University of New Jersey, and the Research
Foundation of State University of New York at Stony Brook under
United States of America Serial No. 11/760379 and to the patents
and/or patent applications resulting from it.
PATENT MARKING NOTICE:
This software is covered by US Patent No. 8,185,551.
PATENT RIGHTS GRANT:
"THIS IMPLEMENTATION" means the copyrightable works distributed by
Tokutek as part of the Fractal Tree project.
"PATENT CLAIMS" means the claims of patents that are owned or
licensable by Tokutek, both currently or in the future; and that in
the absence of this license would be infringed by THIS
IMPLEMENTATION or by using or running THIS IMPLEMENTATION.
"PATENT CHALLENGE" shall mean a challenge to the validity,
patentability, enforceability and/or non-infringement of any of the
PATENT CLAIMS or otherwise opposing any of the PATENT CLAIMS.
Tokutek hereby grants to you, for the term and geographical scope of
the PATENT CLAIMS, a non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to
make, have made, use, offer to sell, sell, import, transfer, and
otherwise run, modify, and propagate the contents of THIS
IMPLEMENTATION, where such license applies only to the PATENT
CLAIMS. This grant does not include claims that would be infringed
only as a consequence of further modifications of THIS
IMPLEMENTATION. If you or your agent or licensee institute or order
or agree to the institution of patent litigation against any entity
(including a cross-claim or counterclaim in a lawsuit) alleging that
THIS IMPLEMENTATION constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any rights
granted to you under this License shall terminate as of the date
such litigation is filed. If you or your agent or exclusive
licensee institute or order or agree to the institution of a PATENT
CHALLENGE, then Tokutek may terminate any rights granted to you
under this License.
*/
#ident "Copyright (c) 2009-2013 Tokutek Inc. All rights reserved."
#ident "$Id$"
#include "test.h"
#include <db.h>
int test_main (int argc, char * const argv[]) {
parse_args(argc, argv);
int r;
DB_ENV *env;
r = db_env_create(&env, 0);
assert_zero(r);
for (uint64_t n = 0 ; n < 10000000000; n += 1000000000) {
env->set_loader_memory_size(env, n);
assert(env->get_loader_memory_size(env) == n);
}
r = env->close(env, 0);
assert_zero(r);
return 0;
}
...@@ -182,6 +182,7 @@ struct __toku_db_env_internal { ...@@ -182,6 +182,7 @@ struct __toku_db_env_internal {
int datadir_lockfd; int datadir_lockfd;
int logdir_lockfd; int logdir_lockfd;
int tmpdir_lockfd; int tmpdir_lockfd;
uint64_t loader_memory_size;
}; };
// test-only environment function for running lock escalation // test-only environment function for running lock escalation
......
...@@ -2416,6 +2416,14 @@ env_iterate_live_transactions(DB_ENV *env, ...@@ -2416,6 +2416,14 @@ env_iterate_live_transactions(DB_ENV *env,
return toku_txn_manager_iter_over_live_root_txns(txn_manager, iter_txns_callback, &e); return toku_txn_manager_iter_over_live_root_txns(txn_manager, iter_txns_callback, &e);
} }
static void env_set_loader_memory_size(DB_ENV *env, uint64_t loader_memory_size) {
env->i->loader_memory_size = loader_memory_size;
}
static uint64_t env_get_loader_memory_size(DB_ENV *env) {
return env->i->loader_memory_size;
}
static int static int
toku_env_create(DB_ENV ** envp, uint32_t flags) { toku_env_create(DB_ENV ** envp, uint32_t flags) {
int r = ENOSYS; int r = ENOSYS;
...@@ -2489,6 +2497,8 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) { ...@@ -2489,6 +2497,8 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) {
USENV(iterate_pending_lock_requests); USENV(iterate_pending_lock_requests);
USENV(iterate_live_transactions); USENV(iterate_live_transactions);
USENV(change_fsync_log_period); USENV(change_fsync_log_period);
USENV(set_loader_memory_size);
USENV(get_loader_memory_size);
#undef USENV #undef USENV
// unlocked methods // unlocked methods
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment