Commit a5d06963 authored by Rich Prohaska's avatar Rich Prohaska

#137 error out env->open when worker threads can not be created

parent 11e0cfaf
......@@ -459,7 +459,7 @@ private:
//
class checkpointer {
public:
void init(pair_list *_pl, TOKULOGGER _logger, evictor *_ev, cachefile_list *files);
int init(pair_list *_pl, TOKULOGGER _logger, evictor *_ev, cachefile_list *files);
void destroy();
void set_checkpoint_period(uint32_t new_period);
uint32_t get_checkpoint_period();
......@@ -481,6 +481,8 @@ private:
cachefile_list *m_cf_list;
pair_list *m_list;
evictor *m_ev;
bool m_checkpointer_cron_init;
bool m_checkpointer_init;
// variable used by the checkpoint thread to know
// when all work induced by cloning on client threads is done
......@@ -513,7 +515,7 @@ const int EVICTION_PERIOD = 1;
//
class evictor {
public:
void init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period);
int init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period);
void destroy();
void add_pair_attr(PAIR_ATTR attr);
void remove_pair_attr(PAIR_ATTR attr);
......@@ -598,6 +600,10 @@ private:
// this variable is ONLY used for testing purposes
uint64_t m_num_eviction_thread_runs;
bool m_ev_thread_init;
bool m_evictor_init;
friend class evictor_test_helpers;
friend class evictor_unit_test;
};
......@@ -609,7 +615,7 @@ private:
//
class cleaner {
public:
void init(uint32_t cleaner_iterations, pair_list* _pl, CACHETABLE _ct);
int init(uint32_t cleaner_iterations, pair_list* _pl, CACHETABLE _ct);
void destroy(void);
uint32_t get_iterations(void);
void set_iterations(uint32_t new_iterations);
......@@ -626,6 +632,8 @@ private:
// minimum period of 1s so if you want
// more frequent cleaner runs you must
// use this)
bool m_cleaner_cron_init;
bool m_cleaner_init;
};
///////////////////////////////////////////////////////////////////////////////
......
......@@ -291,7 +291,10 @@ uint32_t toku_get_cleaner_iterations_unlocked (CACHETABLE ct) {
// reserve 25% as "unreservable". The loader cannot have it.
#define unreservable_memory(size) ((size)/4)
void toku_cachetable_create(CACHETABLE *result, long size_limit, LSN UU(initial_lsn), TOKULOGGER logger) {
int toku_cachetable_create(CACHETABLE *ct_result, long size_limit, LSN UU(initial_lsn), TOKULOGGER logger) {
int result = 0;
int r;
if (size_limit == 0) {
size_limit = 128*1024*1024;
}
......@@ -301,16 +304,46 @@ void toku_cachetable_create(CACHETABLE *result, long size_limit, LSN UU(initial_
ct->cf_list.init();
int num_processors = toku_os_get_number_active_processors();
ct->client_kibbutz = toku_kibbutz_create(num_processors);
ct->ct_kibbutz = toku_kibbutz_create(2*num_processors);
int checkpointing_nworkers = (num_processors/4) ? num_processors/4 : 1;
ct->checkpointing_kibbutz = toku_kibbutz_create(checkpointing_nworkers);
r = toku_kibbutz_create(num_processors, &ct->client_kibbutz);
if (r != 0) {
result = r;
goto cleanup;
}
r = toku_kibbutz_create(2*num_processors, &ct->ct_kibbutz);
if (r != 0) {
result = r;
goto cleanup;
}
r = toku_kibbutz_create(checkpointing_nworkers, &ct->checkpointing_kibbutz);
if (r != 0) {
result = r;
goto cleanup;
}
// must be done after creating ct_kibbutz
ct->ev.init(size_limit, &ct->list, &ct->cf_list, ct->ct_kibbutz, EVICTION_PERIOD);
ct->cp.init(&ct->list, logger, &ct->ev, &ct->cf_list);
ct->cl.init(1, &ct->list, ct); // by default, start with one iteration
r = ct->ev.init(size_limit, &ct->list, &ct->cf_list, ct->ct_kibbutz, EVICTION_PERIOD);
if (r != 0) {
result = r;
goto cleanup;
}
r = ct->cp.init(&ct->list, logger, &ct->ev, &ct->cf_list);
if (r != 0) {
result = r;
goto cleanup;
}
r = ct->cl.init(1, &ct->list, ct); // by default, start with one iteration
if (r != 0) {
result = r;
goto cleanup;
}
ct->env_dir = toku_xstrdup(".");
*result = ct;
cleanup:
if (result == 0) {
*ct_result = ct;
} else {
toku_cachetable_close(&ct);
}
return result;
}
// Returns a pointer to the checkpoint contained within
......@@ -2584,8 +2617,11 @@ void toku_cachetable_close (CACHETABLE *ctp) {
ct->list.destroy();
ct->cf_list.destroy();
if (ct->client_kibbutz)
toku_kibbutz_destroy(ct->client_kibbutz);
if (ct->ct_kibbutz)
toku_kibbutz_destroy(ct->ct_kibbutz);
if (ct->checkpointing_kibbutz)
toku_kibbutz_destroy(ct->checkpointing_kibbutz);
toku_free(ct->env_dir);
toku_free(ct);
......@@ -3071,18 +3107,27 @@ int toku_cleaner_thread (void *cleaner_v) {
//
ENSURE_POD(cleaner);
void cleaner::init(uint32_t _cleaner_iterations, pair_list* _pl, CACHETABLE _ct) {
int cleaner::init(uint32_t _cleaner_iterations, pair_list* _pl, CACHETABLE _ct) {
// default is no cleaner, for now
toku_minicron_setup(&m_cleaner_cron, 0, toku_cleaner_thread, this);
m_cleaner_cron_init = false;
int r = toku_minicron_setup(&m_cleaner_cron, 0, toku_cleaner_thread, this);
if (r == 0) {
m_cleaner_cron_init = true;
}
TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_cleaner_iterations, sizeof m_cleaner_iterations);
m_cleaner_iterations = _cleaner_iterations;
m_pl = _pl;
m_ct = _ct;
m_cleaner_init = true;
return r;
}
// this function is allowed to be called multiple times
void cleaner::destroy(void) {
if (!toku_minicron_has_been_shutdown(&m_cleaner_cron)) {
if (!m_cleaner_init) {
return;
}
if (m_cleaner_cron_init && !toku_minicron_has_been_shutdown(&m_cleaner_cron)) {
// for test code only, production code uses toku_cachetable_minicron_shutdown()
int r = toku_minicron_shutdown(&m_cleaner_cron);
assert(r==0);
......@@ -3659,7 +3704,7 @@ static void *eviction_thread(void *evictor_v) {
// Starts the eviction thread, assigns external object references,
// and initializes all counters and condition variables.
//
void evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period) {
int evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period) {
TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_ev_thread_is_running, sizeof m_ev_thread_is_running);
TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_size_evicting, sizeof m_size_evicting);
......@@ -3713,8 +3758,13 @@ void evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, K
// start the background thread
m_run_thread = true;
m_num_eviction_thread_runs = 0;
m_ev_thread_init = false;
r = toku_pthread_create(&m_ev_thread, NULL, eviction_thread, this);
assert_zero(r);
if (r == 0) {
m_ev_thread_init = true;
}
m_evictor_init = true;
return r;
}
//
......@@ -3723,6 +3773,9 @@ void evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, K
// NOTE: This should only be called if there are no evictions in progress.
//
void evictor::destroy() {
if (!m_evictor_init) {
return;
}
assert(m_size_evicting == 0);
//
// commented out of Ming, because we could not finish
......@@ -3731,16 +3784,16 @@ void evictor::destroy() {
//assert(m_size_current == 0);
// Stop the eviction thread.
if (m_ev_thread_init) {
toku_mutex_lock(&m_ev_thread_lock);
m_run_thread = false;
this->signal_eviction_thread();
toku_mutex_unlock(&m_ev_thread_lock);
void *ret;
int r = toku_pthread_join(m_ev_thread, &ret);
assert_zero(r);
assert(!m_ev_thread_is_running);
}
destroy_partitioned_counter(m_size_nonleaf);
m_size_nonleaf = NULL;
destroy_partitioned_counter(m_size_leaf);
......@@ -4345,7 +4398,7 @@ ENSURE_POD(checkpointer);
//
// Sets the cachetable reference in this checkpointer class, this is temporary.
//
void checkpointer::init(pair_list *_pl,
int checkpointer::init(pair_list *_pl,
TOKULOGGER _logger,
evictor *_ev,
cachefile_list *files) {
......@@ -4356,11 +4409,20 @@ void checkpointer::init(pair_list *_pl,
bjm_init(&m_checkpoint_clones_bjm);
// Default is no checkpointing.
toku_minicron_setup(&m_checkpointer_cron, 0, checkpoint_thread, this);
m_checkpointer_cron_init = false;
int r = toku_minicron_setup(&m_checkpointer_cron, 0, checkpoint_thread, this);
if (r == 0) {
m_checkpointer_cron_init = true;
}
m_checkpointer_init = true;
return r;
}
void checkpointer::destroy() {
if (!this->has_been_shutdown()) {
if (!m_checkpointer_init) {
return;
}
if (m_checkpointer_cron_init && !this->has_been_shutdown()) {
// for test code only, production code uses toku_cachetable_minicron_shutdown()
int r = this->shutdown();
assert(r == 0);
......
......@@ -122,7 +122,7 @@ uint32_t toku_get_cleaner_iterations_unlocked (CACHETABLE ct);
// create and initialize a cache table
// size_limit is the upper limit on the size of the size of the values in the table
// pass 0 if you want the default
void toku_cachetable_create(CACHETABLE *result, long size_limit, LSN initial_lsn, TOKULOGGER);
int toku_cachetable_create(CACHETABLE *result, long size_limit, LSN initial_lsn, TOKULOGGER);
// Create a new cachetable.
// Effects: a new cachetable is created and initialized.
......
......@@ -116,7 +116,9 @@ void evictor_unit_test::init() {
ZERO_STRUCT(m_cf_list);
m_pl.init();
m_cf_list.init();
m_kb = toku_kibbutz_create(1);
m_kb = NULL;
int r = toku_kibbutz_create(1, &m_kb);
assert(r == 0);
}
// destroy class after tests have run
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
/*
COPYING CONDITIONS NOTICE:
This program is free software; you can redistribute it and/or modify
it under the terms of version 2 of the GNU General Public License as
published by the Free Software Foundation, and provided that the
following conditions are met:
* Redistributions of source code must retain this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below).
* Redistributions in binary form must reproduce this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below) in the documentation and/or other materials
provided with the distribution.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
COPYRIGHT NOTICE:
TokuDB, Tokutek Fractal Tree Indexing Library.
Copyright (C) 2007-2013 Tokutek, Inc.
DISCLAIMER:
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
UNIVERSITY PATENT NOTICE:
The technology is licensed by the Massachusetts Institute of
Technology, Rutgers State University of New Jersey, and the Research
Foundation of State University of New York at Stony Brook under
United States of America Serial No. 11/760379 and to the patents
and/or patent applications resulting from it.
PATENT MARKING NOTICE:
This software is covered by US Patent No. 8,185,551.
This software is covered by US Patent No. 8,489,638.
PATENT RIGHTS GRANT:
"THIS IMPLEMENTATION" means the copyrightable works distributed by
Tokutek as part of the Fractal Tree project.
"PATENT CLAIMS" means the claims of patents that are owned or
licensable by Tokutek, both currently or in the future; and that in
the absence of this license would be infringed by THIS
IMPLEMENTATION or by using or running THIS IMPLEMENTATION.
"PATENT CHALLENGE" shall mean a challenge to the validity,
patentability, enforceability and/or non-infringement of any of the
PATENT CLAIMS or otherwise opposing any of the PATENT CLAIMS.
Tokutek hereby grants to you, for the term and geographical scope of
the PATENT CLAIMS, a non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to
make, have made, use, offer to sell, sell, import, transfer, and
otherwise run, modify, and propagate the contents of THIS
IMPLEMENTATION, where such license applies only to the PATENT
CLAIMS. This grant does not include claims that would be infringed
only as a consequence of further modifications of THIS
IMPLEMENTATION. If you or your agent or licensee institute or order
or agree to the institution of patent litigation against any entity
(including a cross-claim or counterclaim in a lawsuit) alleging that
THIS IMPLEMENTATION constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any rights
granted to you under this License shall terminate as of the date
such litigation is filed. If you or your agent or exclusive
licensee institute or order or agree to the institution of a PATENT
CHALLENGE, then Tokutek may terminate any rights granted to you
under this License.
*/
#ident "Copyright (c) 2009-2013 Tokutek Inc. All rights reserved."
#ident "$Id$"
#include "test.h"
#include <db.h>
#include <sys/resource.h>
static int env_open_close(void) {
int result = 0;
int r;
DB_ENV *env = NULL;
r = db_env_create(&env, 0);
assert(r == 0);
env->set_errfile(env, stderr);
r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_LOCK+DB_INIT_MPOOL+DB_INIT_TXN+DB_INIT_LOG + DB_CREATE + DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO);
if (r != 0) {
fprintf(stderr, "%s:%u r=%d\n", __FILE__, __LINE__, r);
result = r;
}
r = env->close(env, 0);
assert(r == 0);
return result;
}
int test_main (int argc, char * const argv[]) {
int r;
int limit = 1;
// parse_args(argc, argv);
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "-v") == 0) {
verbose++;
continue;
}
if (strcmp(argv[i], "-q") == 0) {
if (verbose > 0) verbose--;
continue;
}
limit = atoi(argv[i]);
continue;
}
toku_os_recursive_delete(TOKU_TEST_FILENAME);
r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO);
struct rlimit nproc_rlimit;
r = getrlimit(RLIMIT_NPROC, &nproc_rlimit);
assert(r == 0);
nproc_rlimit.rlim_cur = limit;
r = setrlimit(RLIMIT_NPROC, &nproc_rlimit);
assert(r == 0);
printf("nproc %lu\n", nproc_rlimit.rlim_cur);
(void) env_open_close();
return 0;
}
......@@ -1158,7 +1158,9 @@ static void scan_op_worker(void *arg) {
static int UU() scan_op_no_check_parallel(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
const int num_cores = toku_os_get_number_processors();
const int num_workers = arg->cli->num_DBs < num_cores ? arg->cli->num_DBs : num_cores;
KIBBUTZ kibbutz = toku_kibbutz_create(num_workers);
KIBBUTZ kibbutz = NULL;
int r = toku_kibbutz_create(num_workers, &kibbutz);
assert(r == 0);
for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
struct scan_op_worker_info *XCALLOC(info);
info->db = arg->dbp[i];
......@@ -2111,7 +2113,9 @@ static int fill_tables_default(DB_ENV *env, DB **dbs, struct cli_args *args, boo
// be used for internal engine work (ie: flushes, loader threads, etc).
const int max_num_workers = (num_cores + 1) / 2;
const int num_workers = args->num_DBs < max_num_workers ? args->num_DBs : max_num_workers;
KIBBUTZ kibbutz = toku_kibbutz_create(num_workers);
KIBBUTZ kibbutz = NULL;
int r = toku_kibbutz_create(num_workers, &kibbutz);
assert(r == 0);
for (int i = 0; i < args->num_DBs; i++) {
struct fill_table_worker_info *XCALLOC(info);
info->env = env;
......
......@@ -365,7 +365,7 @@ env_fs_init(DB_ENV *env) {
static int
env_fs_init_minicron(DB_ENV *env) {
int r = toku_minicron_setup(&env->i->fs_poller, env->i->fs_poll_time*1000, env_fs_poller, env);
assert(r == 0);
if (r == 0)
env->i->fs_poller_is_init = true;
return r;
}
......@@ -402,11 +402,12 @@ env_change_fsync_log_period(DB_ENV* env, uint32_t period_ms) {
}
}
static void
static int
env_fsync_log_cron_init(DB_ENV *env) {
int r = toku_minicron_setup(&env->i->fsync_log_cron, env->i->fsync_log_period_ms, env_fsync_log_on_minicron, env);
assert(r == 0);
if (r == 0)
env->i->fsync_log_cron_is_init = true;
return r;
}
static void
......@@ -994,7 +995,11 @@ env_open(DB_ENV * env, const char *home, uint32_t flags, int mode) {
if (env->i->cachetable==NULL) {
// If we ran recovery then the cachetable should be set here.
toku_cachetable_create(&env->i->cachetable, env->i->cachetable_size, ZERO_LSN, env->i->logger);
r = toku_cachetable_create(&env->i->cachetable, env->i->cachetable_size, ZERO_LSN, env->i->logger);
if (r != 0) {
r = toku_ydb_do_error(env, r, "Cant create a cachetable\n");
goto cleanup;
}
}
toku_cachetable_set_env_dir(env->i->cachetable, env->i->dir);
......@@ -1009,7 +1014,7 @@ env_open(DB_ENV * env, const char *home, uint32_t flags, int mode) {
bool create_new_rollback_file = newenv | upgrade_in_progress;
r = toku_logger_open_rollback(env->i->logger, env->i->cachetable, create_new_rollback_file);
if (r != 0) {
r = toku_ydb_do_error(env, r, "cant open rollback");
r = toku_ydb_do_error(env, r, "Cant open rollback\n");
goto cleanup;
}
}
......@@ -1027,7 +1032,7 @@ env_open(DB_ENV * env, const char *home, uint32_t flags, int mode) {
assert_zero(r);
r = toku_db_open_iname(env->i->persistent_environment, txn, toku_product_name_strings.environmentdictionary, DB_CREATE, mode);
if (r != 0) {
r = toku_ydb_do_error(env, r, "cant open persistent env");
r = toku_ydb_do_error(env, r, "Cant open persistent env\n");
goto cleanup;
}
if (newenv) {
......@@ -1065,20 +1070,29 @@ env_open(DB_ENV * env, const char *home, uint32_t flags, int mode) {
assert_zero(r);
r = toku_db_open_iname(env->i->directory, txn, toku_product_name_strings.fileopsdirectory, DB_CREATE, mode);
if (r != 0) {
r = toku_ydb_do_error(env, r, "cant open %s", toku_product_name_strings.fileopsdirectory);
r = toku_ydb_do_error(env, r, "Cant open %s\n", toku_product_name_strings.fileopsdirectory);
goto cleanup;
}
}
if (using_txns) {
r = locked_txn_commit(txn, 0);
assert_zero(r);
txn = NULL;
}
cp = toku_cachetable_get_checkpointer(env->i->cachetable);
r = toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, STARTUP_CHECKPOINT);
assert_zero(r);
env_fs_poller(env); // get the file system state at startup
env_fs_init_minicron(env);
env_fsync_log_cron_init(env);
r = env_fs_init_minicron(env);
if (r != 0) {
r = toku_ydb_do_error(env, r, "Cant create fs minicron\n");
goto cleanup;
}
r = env_fsync_log_cron_init(env);
if (r != 0) {
r = toku_ydb_do_error(env, r, "Cant create fsync log minicron\n");
goto cleanup;
}
cleanup:
if (r!=0) {
if (txn) {
......
......@@ -118,7 +118,9 @@ struct kibbutz {
static void *work_on_kibbutz (void *);
KIBBUTZ toku_kibbutz_create (int n_workers) {
int toku_kibbutz_create (int n_workers, KIBBUTZ *kb_ret) {
int r = 0;
*kb_ret = NULL;
KIBBUTZ XCALLOC(k);
toku_mutex_init(&k->mutex, NULL);
toku_cond_init(&k->cond, NULL);
......@@ -128,12 +130,19 @@ KIBBUTZ toku_kibbutz_create (int n_workers) {
k->n_workers = n_workers;
XMALLOC_N(n_workers, k->workers);
XMALLOC_N(n_workers, k->ids);
for (int i=0; i<n_workers; i++) {
for (int i = 0; i < n_workers; i++) {
k->ids[i].k = k;
int r = toku_pthread_create(&k->workers[i], NULL, work_on_kibbutz, &k->ids[i]);
assert(r==0);
r = toku_pthread_create(&k->workers[i], NULL, work_on_kibbutz, &k->ids[i]);
if (r != 0) {
k->n_workers = i;
toku_kibbutz_destroy(k);
break;
}
}
if (r == 0) {
*kb_ret = k;
}
return k;
return r;
}
static void klock (KIBBUTZ k) {
......
......@@ -100,7 +100,7 @@ typedef struct kibbutz *KIBBUTZ;
//
// create a kibbutz where n_workers is the number of threads in the threadpool
//
KIBBUTZ toku_kibbutz_create (int n_workers);
int toku_kibbutz_create (int n_workers, KIBBUTZ *kb);
//
// enqueue a workitem in the kibbutz. When the kibbutz is to work on this workitem,
// it calls f(extra).
......
......@@ -111,7 +111,9 @@ static void dowork (void *idv) {
}
static void kibbutz_test (bool parent_finishes_first) {
KIBBUTZ k = toku_kibbutz_create(NT);
KIBBUTZ k = NULL;
int r = toku_kibbutz_create(NT, &k);
assert(r == 0);
if (verbose) printf("create\n");
int ids[ND];
for (int i=0; i<ND; i++) {
......
......@@ -113,7 +113,9 @@ static void dowork (void *idv) {
}
static void kibbutz_test (void) {
KIBBUTZ k = toku_kibbutz_create(1);
KIBBUTZ k = NULL;
int r = toku_kibbutz_create(1, &k);
assert(r == 0);
if (verbose) printf("create\n");
int ids[ND];
for (int i=0; i<ND; i++) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment