Commit 0bfc5aeb authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

refs #6162, merge fractal tree piece to main

git-svn-id: file:///svn/toku/tokudb@54006 c7de825b-a66e-492c-adef-691d508d4ae1
parent 80c9a926
......@@ -365,6 +365,7 @@ static void print_db_env_struct (void) {
"int (*get_txn_from_xid) (DB_ENV*, /*in*/ TOKU_XA_XID *, /*out*/ DB_TXN **)",
"int (*get_cursor_for_directory) (DB_ENV*, /*in*/ DB_TXN *, /*out*/ DBC **)",
"int (*get_cursor_for_persistent_environment) (DB_ENV*, /*in*/ DB_TXN *, /*out*/ DBC **)",
"void (*change_fsync_log_period)(DB_ENV*, uint32_t)",
NULL};
sort_and_dump_fields("db_env", true, extra);
......
......@@ -494,7 +494,6 @@ public:
void destroy(void);
uint32_t get_iterations(void);
void set_iterations(uint32_t new_iterations);
uint32_t get_period(void);
uint32_t get_period_unlocked(void);
void set_period(uint32_t new_period);
int run_cleaner(void);
......
......@@ -179,10 +179,6 @@ void toku_set_checkpoint_period (CACHETABLE ct, uint32_t new_period) {
ct->cp.set_checkpoint_period(new_period);
}
uint32_t toku_get_checkpoint_period (CACHETABLE ct) {
return ct->cp.get_checkpoint_period();
}
uint32_t toku_get_checkpoint_period_unlocked (CACHETABLE ct) {
return ct->cp.get_checkpoint_period();
}
......@@ -191,10 +187,6 @@ void toku_set_cleaner_period (CACHETABLE ct, uint32_t new_period) {
ct->cl.set_period(new_period);
}
uint32_t toku_get_cleaner_period (CACHETABLE ct) {
return ct->cl.get_period();
}
uint32_t toku_get_cleaner_period_unlocked (CACHETABLE ct) {
return ct->cl.get_period_unlocked();
}
......@@ -3016,16 +3008,15 @@ void cleaner::set_iterations(uint32_t new_iterations) {
m_cleaner_iterations = new_iterations;
}
uint32_t cleaner::get_period(void) {
return toku_minicron_get_period(&m_cleaner_cron);
}
uint32_t cleaner::get_period_unlocked(void) {
return toku_minicron_get_period_unlocked(&m_cleaner_cron);
return toku_minicron_get_period_in_seconds_unlocked(&m_cleaner_cron);
}
//
// Sets how often the cleaner thread will run, in seconds
//
void cleaner::set_period(uint32_t new_period) {
toku_minicron_change_period(&m_cleaner_cron, new_period);
toku_minicron_change_period(&m_cleaner_cron, new_period*1000);
}
// Effect: runs a cleaner.
......@@ -4218,17 +4209,17 @@ void checkpointer::destroy() {
}
//
// Sets how often the checkpoint thread will run.
// Sets how often the checkpoint thread will run, in seconds
//
void checkpointer::set_checkpoint_period(uint32_t new_period) {
toku_minicron_change_period(&m_checkpointer_cron, new_period);
toku_minicron_change_period(&m_checkpointer_cron, new_period*1000);
}
//
// Sets how often the checkpoint thread will run.
//
uint32_t checkpointer::get_checkpoint_period() {
return toku_minicron_get_period(&m_checkpointer_cron);
return toku_minicron_get_period_in_seconds_unlocked(&m_checkpointer_cron);
}
//
......
......@@ -27,7 +27,6 @@
typedef BLOCKNUM CACHEKEY;
void toku_set_cleaner_period (CACHETABLE ct, uint32_t new_period);
uint32_t toku_get_cleaner_period (CACHETABLE ct);
uint32_t toku_get_cleaner_period_unlocked (CACHETABLE ct);
void toku_set_cleaner_iterations (CACHETABLE ct, uint32_t new_iterations);
uint32_t toku_get_cleaner_iterations (CACHETABLE ct);
......
......@@ -15,7 +15,6 @@ void toku_set_checkpoint_period(CACHETABLE ct, uint32_t new_period);
//Effect: Change [end checkpoint (n) - begin checkpoint (n+1)] delay to
// new_period seconds. 0 means disable.
uint32_t toku_get_checkpoint_period(CACHETABLE ct);
uint32_t toku_get_checkpoint_period_unlocked(CACHETABLE ct);
......
......@@ -44,16 +44,22 @@ minicron_do (void *pv)
toku_mutex_unlock(&p->mutex);
return 0;
}
if (p->period_in_seconds==0) {
if (p->period_in_ms == 0) {
// if we aren't supposed to do it then just do an untimed wait.
toku_cond_wait(&p->condvar, &p->mutex);
} else {
}
else if (p->period_in_ms <= 1000) {
toku_mutex_unlock(&p->mutex);
usleep(p->period_in_ms * 1000);
toku_mutex_lock(&p->mutex);
}
else {
// Recompute the wakeup time every time (instead of once per call to f) in case the period changges.
toku_timespec_t wakeup_at = p->time_of_last_call_to_f;
wakeup_at.tv_sec += p->period_in_seconds;
wakeup_at.tv_sec += (p->period_in_ms/1000);
wakeup_at.tv_nsec += (p->period_in_ms % 1000) * 1000000;
toku_timespec_t now;
toku_gettime(&now);
//printf("wakeup at %.6f (after %d seconds) now=%.6f\n", wakeup_at.tv_sec + wakeup_at.tv_nsec*1e-9, p->period_in_seconds, now.tv_sec + now.tv_nsec*1e-9);
int r = toku_cond_timedwait(&p->condvar, &p->mutex, &wakeup_at);
if (r!=0 && r!=ETIMEDOUT) fprintf(stderr, "%s:%d r=%d (%s)", __FILE__, __LINE__, r, strerror(r));
assert(r==0 || r==ETIMEDOUT);
......@@ -63,12 +69,12 @@ minicron_do (void *pv)
toku_mutex_unlock(&p->mutex);
return 0;
}
if (p->period_in_seconds >0) {
// maybe do a checkpoint
if (p->period_in_ms > 1000) {
toku_timespec_t now;
toku_gettime(&now);
toku_timespec_t time_to_call = p->time_of_last_call_to_f;
time_to_call.tv_sec += p->period_in_seconds;
time_to_call.tv_sec += p->period_in_ms/1000;
time_to_call.tv_nsec += (p->period_in_ms % 1000) * 1000000;
int compare = timespec_compare(&time_to_call, &now);
//printf("compare(%.6f, %.6f)=%d\n", time_to_call.tv_sec + time_to_call.tv_nsec*1e-9, now.tv_sec+now.tv_nsec*1e-9, compare);
if (compare <= 0) {
......@@ -80,21 +86,26 @@ minicron_do (void *pv)
}
}
else {
toku_mutex_unlock(&p->mutex);
int r = p->f(p->arg);
assert(r==0);
toku_mutex_lock(&p->mutex);
}
}
}
int
toku_minicron_setup(struct minicron *p, uint32_t period_in_seconds, int(*f)(void *), void *arg)
toku_minicron_setup(struct minicron *p, uint32_t period_in_ms, int(*f)(void *), void *arg)
{
p->f = f;
p->arg = arg;
toku_gettime(&p->time_of_last_call_to_f);
//printf("now=%.6f", p->time_of_last_call_to_f.tv_sec + p->time_of_last_call_to_f.tv_nsec*1e-9);
p->period_in_seconds = period_in_seconds;
p->period_in_ms = period_in_ms;
p->do_shutdown = false;
toku_mutex_init(&p->mutex, 0);
toku_cond_init (&p->condvar, 0);
//printf("%s:%d setup period=%d\n", __FILE__, __LINE__, period_in_seconds);
return toku_pthread_create(&p->thread, 0, minicron_do, p);
}
......@@ -102,25 +113,24 @@ void
toku_minicron_change_period(struct minicron *p, uint32_t new_period)
{
toku_mutex_lock(&p->mutex);
p->period_in_seconds = new_period;
p->period_in_ms = new_period;
toku_cond_signal(&p->condvar);
toku_mutex_unlock(&p->mutex);
}
/* unlocked function for use by engine status which takes no locks */
uint32_t
toku_minicron_get_period(struct minicron *p)
toku_minicron_get_period_in_seconds_unlocked(struct minicron *p)
{
toku_mutex_lock(&p->mutex);
uint32_t retval = toku_minicron_get_period_unlocked(p);
toku_mutex_unlock(&p->mutex);
uint32_t retval = p->period_in_ms/1000;
return retval;
}
/* unlocked function for use by engine status which takes no locks */
uint32_t
toku_minicron_get_period_unlocked(struct minicron *p)
toku_minicron_get_period_in_ms_unlocked(struct minicron *p)
{
uint32_t retval = p->period_in_seconds;
uint32_t retval = p->period_in_ms;
return retval;
}
......
......@@ -16,7 +16,7 @@
// To create a minicron,
// 1) allocate a "struct minicron" somewhere.
// Rationale: This struct can be stored inside another struct (such as the cachetable), avoiding a malloc/free pair.
// 2) call toku_minicron_setup, specifying a period (in seconds), a function, and some arguments.
// 2) call toku_minicron_setup, specifying a period (in milliseconds), a function, and some arguments.
// If the period is positive then the function is called periodically (with the period specified)
// Note: The period is measured from when the previous call to f finishes to when the new call starts.
// Thus, if the period is 5 minutes, and it takes 8 minutes to run f, then the actual periodicity is 13 minutes.
......@@ -32,14 +32,14 @@ struct minicron {
toku_cond_t condvar;
int (*f)(void*);
void *arg;
uint32_t period_in_seconds;
uint32_t period_in_ms;
bool do_shutdown;
};
int toku_minicron_setup (struct minicron *s, uint32_t period_in_seconds, int(*f)(void *), void *arg);
int toku_minicron_setup (struct minicron *s, uint32_t period_in_ms, int(*f)(void *), void *arg);
void toku_minicron_change_period(struct minicron *p, uint32_t new_period);
uint32_t toku_minicron_get_period(struct minicron *p);
uint32_t toku_minicron_get_period_unlocked(struct minicron *p);
uint32_t toku_minicron_get_period_in_seconds_unlocked(struct minicron *p);
uint32_t toku_minicron_get_period_in_ms_unlocked(struct minicron *p);
int toku_minicron_shutdown(struct minicron *p);
bool toku_minicron_has_been_shutdown(struct minicron *p);
......
......@@ -53,7 +53,7 @@ test2 (void* v)
{
struct minicron m;
ZERO_STRUCT(m);
int r = toku_minicron_setup(&m, 10, never_run, 0); assert(r==0);
int r = toku_minicron_setup(&m, 10000, never_run, 0); assert(r==0);
sleep(2);
r = toku_minicron_shutdown(&m); assert(r==0);
return v;
......@@ -90,7 +90,7 @@ test3 (void* v)
gettimeofday(&tx.tv, 0);
tx.counter=0;
ZERO_STRUCT(m);
int r = toku_minicron_setup(&m, 1, run_5x, &tx); assert(r==0);
int r = toku_minicron_setup(&m, 1000, run_5x, &tx); assert(r==0);
sleep(5);
r = toku_minicron_shutdown(&m); assert(r==0);
assert(tx.counter>=4 && tx.counter<=5); // after 5 seconds it could have run 4 or 5 times.
......@@ -113,7 +113,7 @@ test4 (void *v) {
struct minicron m;
int counter = 0;
ZERO_STRUCT(m);
int r = toku_minicron_setup(&m, 2, run_3sec, &counter); assert(r==0);
int r = toku_minicron_setup(&m, 2000, run_3sec, &counter); assert(r==0);
sleep(9);
r = toku_minicron_shutdown(&m); assert(r==0);
assert(counter==2);
......@@ -125,8 +125,8 @@ test5 (void *v) {
struct minicron m;
int counter = 0;
ZERO_STRUCT(m);
int r = toku_minicron_setup(&m, 10, run_3sec, &counter); assert(r==0);
toku_minicron_change_period(&m, 2);
int r = toku_minicron_setup(&m, 10000, run_3sec, &counter); assert(r==0);
toku_minicron_change_period(&m, 2000);
sleep(9);
r = toku_minicron_shutdown(&m); assert(r==0);
assert(counter==2);
......@@ -137,7 +137,7 @@ static void*
test6 (void *v) {
struct minicron m;
ZERO_STRUCT(m);
int r = toku_minicron_setup(&m, 5, never_run, 0); assert(r==0);
int r = toku_minicron_setup(&m, 5000, never_run, 0); assert(r==0);
toku_minicron_change_period(&m, 0);
sleep(7);
r = toku_minicron_shutdown(&m); assert(r==0);
......
......@@ -145,6 +145,7 @@ struct arg {
struct cli_args *cli;
bool do_prepare;
bool prelock_updates;
bool track_thread_performance;
};
static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cli_args) {
......@@ -158,6 +159,7 @@ static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cl
arg->operation_extra = nullptr;
arg->do_prepare = false;
arg->prelock_updates = false;
arg->track_thread_performance = true;
}
enum operation_type {
......@@ -518,7 +520,9 @@ static void *worker(void *arg_v) {
}
}
unlock_worker_op(we);
if (arg->track_thread_performance) {
we->counters[OPERATION]++;
}
if (arg->sleep_ms) {
usleep(arg->sleep_ms * 1000);
}
......
......@@ -88,6 +88,9 @@ struct __toku_db_env_internal {
int fs_poll_time; // Time in seconds between statfs calls
struct minicron fs_poller; // Poll the file systems
bool fs_poller_is_init;
uint32_t fsync_log_period_ms;
bool fsync_log_cron_is_init;
struct minicron fsync_log_cron; // fsync recovery log
int envdir_lockfd;
int datadir_lockfd;
int logdir_lockfd;
......
......@@ -92,6 +92,7 @@ typedef enum {
YDB_LAYER_NUM_DB_CLOSE,
YDB_LAYER_NUM_OPEN_DBS,
YDB_LAYER_MAX_OPEN_DBS,
YDB_LAYER_FSYNC_LOG_PERIOD,
#if 0
YDB_LAYER_ORIGINAL_ENV_VERSION, /* version of original environment, read from persistent environment */
YDB_LAYER_STARTUP_ENV_VERSION, /* version of environment at this startup, read from persistent environment (curr_env_ver_key) */
......@@ -127,6 +128,7 @@ ydb_layer_status_init (void) {
STATUS_INIT(YDB_LAYER_NUM_DB_CLOSE, UINT64, "db closes");
STATUS_INIT(YDB_LAYER_NUM_OPEN_DBS, UINT64, "num open dbs now");
STATUS_INIT(YDB_LAYER_MAX_OPEN_DBS, UINT64, "max open dbs");
STATUS_INIT(YDB_LAYER_FSYNC_LOG_PERIOD, UINT64, "period, in ms, that recovery log is automatically fsynced");
STATUS_VALUE(YDB_LAYER_TIME_STARTUP) = time(NULL);
ydb_layer_status.initialized = true;
......@@ -134,8 +136,9 @@ ydb_layer_status_init (void) {
#undef STATUS_INIT
static void
ydb_layer_get_status(YDB_LAYER_STATUS statp) {
ydb_layer_get_status(DB_ENV* env, YDB_LAYER_STATUS statp) {
STATUS_VALUE(YDB_LAYER_TIME_NOW) = time(NULL);
STATUS_VALUE(YDB_LAYER_FSYNC_LOG_PERIOD) = toku_minicron_get_period_in_ms_unlocked(&env->i->fsync_log_cron);
*statp = ydb_layer_status;
}
......@@ -314,7 +317,7 @@ env_fs_init(DB_ENV *env) {
// Initialize the minicron that polls file system space
static int
env_fs_init_minicron(DB_ENV *env) {
int r = toku_minicron_setup(&env->i->fs_poller, env->i->fs_poll_time, env_fs_poller, env);
int r = toku_minicron_setup(&env->i->fs_poller, env->i->fs_poll_time*1000, env_fs_poller, env);
assert(r == 0);
env->i->fs_poller_is_init = true;
return r;
......@@ -330,6 +333,44 @@ env_fs_destroy(DB_ENV *env) {
}
}
static int
env_fsync_log_on_minicron(void *arg) {
DB_ENV *env = (DB_ENV *) arg;
int r = env->log_flush(env, 0);
assert(r == 0);
return 0;
}
static void
env_fsync_log_init(DB_ENV *env) {
env->i->fsync_log_period_ms = 0;
env->i->fsync_log_cron_is_init = false;
}
static void UU()
env_change_fsync_log_period(DB_ENV* env, uint32_t period_ms) {
env->i->fsync_log_period_ms = period_ms;
if (env->i->fsync_log_cron_is_init) {
toku_minicron_change_period(&env->i->fsync_log_cron, period_ms);
}
}
static void
env_fsync_log_cron_init(DB_ENV *env) {
int r = toku_minicron_setup(&env->i->fsync_log_cron, env->i->fsync_log_period_ms, env_fsync_log_on_minicron, env);
assert(r == 0);
env->i->fsync_log_cron_is_init = true;
}
static void
env_fsync_log_cron_destroy(DB_ENV *env) {
if (env->i->fsync_log_cron_is_init) {
int r = toku_minicron_shutdown(&env->i->fsync_log_cron);
assert(r == 0);
env->i->fsync_log_cron_is_init = false;
}
}
static void
env_setup_real_dir(DB_ENV *env, char **real_dir, const char *nominal_dir) {
toku_free(*real_dir);
......@@ -986,6 +1027,7 @@ env_open(DB_ENV * env, const char *home, uint32_t flags, int mode) {
assert_zero(r);
env_fs_poller(env); // get the file system state at startup
env_fs_init_minicron(env);
env_fsync_log_cron_init(env);
cleanup:
if (r!=0) {
if (env && env->i) {
......@@ -1082,6 +1124,7 @@ env_close(DB_ENV * env, uint32_t flags) {
}
env_fs_destroy(env);
env_fsync_log_cron_destroy(env);
env->i->ltm.destroy();
if (env->i->data_dir)
toku_free(env->i->data_dir);
......@@ -1132,9 +1175,12 @@ env_log_archive(DB_ENV * env, char **list[], uint32_t flags) {
static int
env_log_flush(DB_ENV * env, const DB_LSN * lsn __attribute__((__unused__))) {
HANDLE_PANICKED_ENV(env);
// do nothing if no logger
if (env->i->logger) {
// We just flush everything. MySQL uses lsn == 0 which means flush everything.
// For anyone else using the log, it is correct to flush too much, so we are OK.
toku_logger_fsync(env->i->logger);
}
return 0;
}
......@@ -1493,7 +1539,7 @@ env_checkpointing_get_period(DB_ENV * env, uint32_t *seconds) {
int r = 0;
if (!env_opened(env)) r = EINVAL;
else
*seconds = toku_get_checkpoint_period(env->i->cachetable);
*seconds = toku_get_checkpoint_period_unlocked(env->i->cachetable);
return r;
}
......@@ -1503,7 +1549,7 @@ env_cleaner_get_period(DB_ENV * env, uint32_t *seconds) {
int r = 0;
if (!env_opened(env)) r = EINVAL;
else
*seconds = toku_get_cleaner_period(env->i->cachetable);
*seconds = toku_get_cleaner_period_unlocked(env->i->cachetable);
return r;
}
......@@ -1830,7 +1876,7 @@ env_get_engine_status (DB_ENV * env, TOKU_ENGINE_STATUS_ROW engstat, uint64_t ma
{
YDB_LAYER_STATUS_S ydb_stat;
ydb_layer_get_status(&ydb_stat);
ydb_layer_get_status(env, &ydb_stat);
for (int i = 0; i < YDB_LAYER_STATUS_NUM_ROWS && row < maxrows; i++) {
engstat[row++] = ydb_stat.status[i];
}
......@@ -2171,6 +2217,7 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) {
USENV(create_loader);
USENV(get_cursor_for_persistent_environment);
USENV(get_cursor_for_directory);
USENV(change_fsync_log_period);
#undef USENV
// unlocked methods
......@@ -2194,6 +2241,7 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) {
result->i->logdir_lockfd = -1;
result->i->tmpdir_lockfd = -1;
env_fs_init(result);
env_fsync_log_init(result);
result->i->bt_compare = toku_builtin_compare_fun;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment