Commit 296fca6b authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Contrary to r18222, the new group commit code was not on the main truck.

{{{
svn merge -r17893:18056 https://svn.tokutek.com/tokudb/toku/tokudb.2370c
}}}

Refs #2370, #2385.  [t:2370] [t:2385].


git-svn-id: file:///svn/toku/tokudb@18259 c7de825b-a66e-492c-adef-691d508d4ae1
parent e599eadb
...@@ -427,7 +427,6 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__ ...@@ -427,7 +427,6 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__
printf(" u_int64_t time_ydb_lock_held_unavailable; /* number of times a thread migrated and theld is unavailable */ \n"); printf(" u_int64_t time_ydb_lock_held_unavailable; /* number of times a thread migrated and theld is unavailable */ \n");
printf(" u_int64_t max_time_ydb_lock_held; /* max time a client thread held the ydb lock */ \n"); printf(" u_int64_t max_time_ydb_lock_held; /* max time a client thread held the ydb lock */ \n");
printf(" u_int64_t total_time_ydb_lock_held;/* total time client threads held the ydb lock */ \n"); printf(" u_int64_t total_time_ydb_lock_held;/* total time client threads held the ydb lock */ \n");
printf(" u_int64_t logger_lock_ctr; /* how many times has logger lock been taken/released */ \n");
printf(" u_int32_t checkpoint_period; /* delay between automatic checkpoints */ \n"); printf(" u_int32_t checkpoint_period; /* delay between automatic checkpoints */ \n");
printf(" u_int32_t checkpoint_footprint; /* state of checkpoint procedure */ \n"); printf(" u_int32_t checkpoint_footprint; /* state of checkpoint procedure */ \n");
printf(" char checkpoint_time_begin[26]; /* time of last checkpoint begin */ \n"); printf(" char checkpoint_time_begin[26]; /* time of last checkpoint begin */ \n");
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <errno.h> #include <errno.h>
#include <toku_assert.h> #include <toku_assert.h>
#include <stdio.h> #include <stdio.h>
#include <string.h>
//Print any necessary errors //Print any necessary errors
//Return whether we should try the write again. //Return whether we should try the write again.
...@@ -130,19 +131,29 @@ toku_set_func_fsync(int (*fsync_function)(int)) { ...@@ -130,19 +131,29 @@ toku_set_func_fsync(int (*fsync_function)(int)) {
return 0; return 0;
} }
// keep trying if fsync fails because of EINTR
int int
toku_file_fsync(int fd) { toku_file_fsync_without_accounting (int fd) {
int r = -1; int r = -1;
uint64_t tstart = get_tnow();
while (r != 0) { while (r != 0) {
if (t_fsync) if (t_fsync)
r = t_fsync(fd); r = t_fsync(fd);
else else
r = fsync(fd); r = fsync(fd);
if (r) if (r) {
assert(errno==EINTR); int rr = errno;
if (rr!=EINTR) printf("rr=%d (%s)\n", rr, strerror(rr));
assert(rr==EINTR);
} }
}
return r;
}
// keep trying if fsync fails because of EINTR
int
toku_file_fsync(int fd) {
uint64_t tstart = get_tnow();
int r = toku_file_fsync_without_accounting(fd);
toku_sync_fetch_and_increment_uint64(&toku_fsync_count); toku_sync_fetch_and_increment_uint64(&toku_fsync_count);
toku_sync_fetch_and_add_uint64(&toku_fsync_time, get_tnow() - tstart); toku_sync_fetch_and_add_uint64(&toku_fsync_time, get_tnow() - tstart);
return r; return r;
......
...@@ -5459,8 +5459,6 @@ toku_brt_lock_init(void) { ...@@ -5459,8 +5459,6 @@ toku_brt_lock_init(void) {
int r = 0; int r = 0;
if (r==0) if (r==0)
r = toku_pwrite_lock_init(); r = toku_pwrite_lock_init();
if (r==0)
r = toku_logger_lock_init();
return r; return r;
} }
...@@ -5469,8 +5467,6 @@ toku_brt_lock_destroy(void) { ...@@ -5469,8 +5467,6 @@ toku_brt_lock_destroy(void) {
int r = 0; int r = 0;
if (r==0) if (r==0)
r = toku_pwrite_lock_destroy(); r = toku_pwrite_lock_destroy();
if (r==0)
r = toku_logger_lock_destroy();
return r; return r;
} }
......
...@@ -59,7 +59,12 @@ struct logbuf { ...@@ -59,7 +59,12 @@ struct logbuf {
struct tokulogger { struct tokulogger {
enum typ_tag tag; // must be first enum typ_tag tag; // must be first
struct mylock input_lock, output_lock; // acquired in that order struct mylock input_lock;
toku_pthread_mutex_t output_condition_lock; // if you need both this lock and input_lock, acquire the output_lock first, then input_lock. More typical is to get the output_is_available condition to be false, and then acquire the input_lock.
toku_pthread_cond_t output_condition; //
BOOL output_is_available; // this is part of the predicate for the output condition. It's true if no thread is modifying the output (either doing an fsync or otherwise fiddling with the output).
BOOL is_open; BOOL is_open;
BOOL is_panicked; BOOL is_panicked;
BOOL write_log_files; BOOL write_log_files;
...@@ -75,14 +80,15 @@ struct tokulogger { ...@@ -75,14 +80,15 @@ struct tokulogger {
OMT live_txns; // a sorted tree. Old comment said should be a hashtable. Do we still want that? OMT live_txns; // a sorted tree. Old comment said should be a hashtable. Do we still want that?
struct logbuf inbuf; // data being accumulated for the write struct logbuf inbuf; // data being accumulated for the write
// To access these, you must have the output lock // To access these, you must have the output condition lock.
LSN written_lsn; // the last lsn written LSN written_lsn; // the last lsn written
LSN fsynced_lsn; // What is the LSN of the highest fsynced log entry LSN fsynced_lsn; // What is the LSN of the highest fsynced log entry (accessed only while holding the output lock, and updated only when the output lock and output permission are held)
LSN checkpoint_lsn; // What is the LSN of the most recent completed checkpoint. LSN checkpoint_lsn; // What is the LSN of the most recent completed checkpoint.
long long next_log_file_number; long long next_log_file_number;
struct logbuf outbuf; // data being written to the file struct logbuf outbuf; // data being written to the file
int n_in_file; // The amount of data in the current file int n_in_file; // The amount of data in the current file
// To access the logfilemgr you must have the output condition lock.
TOKULOGFILEMGR logfilemgr; TOKULOGFILEMGR logfilemgr;
u_int32_t write_block_size; // How big should the blocks be written to various logs? u_int32_t write_block_size; // How big should the blocks be written to various logs?
......
This diff is collapsed.
...@@ -12,8 +12,6 @@ int toku_logger_open (const char *directory, TOKULOGGER logger); ...@@ -12,8 +12,6 @@ int toku_logger_open (const char *directory, TOKULOGGER logger);
int toku_logger_shutdown(TOKULOGGER logger); int toku_logger_shutdown(TOKULOGGER logger);
int toku_logger_close(TOKULOGGER *loggerp); int toku_logger_close(TOKULOGGER *loggerp);
u_int32_t toku_logger_get_lock_ctr(void);
int toku_logger_fsync (TOKULOGGER logger); int toku_logger_fsync (TOKULOGGER logger);
void toku_logger_panic (TOKULOGGER logger, int err); void toku_logger_panic (TOKULOGGER logger, int err);
int toku_logger_panicked(TOKULOGGER logger); int toku_logger_panicked(TOKULOGGER logger);
......
...@@ -931,7 +931,10 @@ static int toku_recover_commit (struct logtype_commit *l, RECOVER_ENV renv) { ...@@ -931,7 +931,10 @@ static int toku_recover_commit (struct logtype_commit *l, RECOVER_ENV renv) {
} }
// commit the transaction // commit the transaction
r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, l->lsn, NULL, NULL); r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, l->lsn,
NULL, NULL,
// No need to release locks during recovery.
NULL, NULL, NULL);
assert(r == 0); assert(r == 0);
// close the transaction // close the transaction
......
...@@ -78,12 +78,16 @@ died: ...@@ -78,12 +78,16 @@ died:
// Doesn't close the txn, just performs the commit operations. // Doesn't close the txn, just performs the commit operations.
int toku_txn_commit_txn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, int toku_txn_commit_txn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
return toku_txn_commit_with_lsn(txn, nosync, yield, yieldv, ZERO_LSN, poll, poll_extra); void (*release_locks)(void*), void(*reacquire_locks)(void*), void *locks_thunk) {
return toku_txn_commit_with_lsn(txn, nosync, yield, yieldv, ZERO_LSN,
poll, poll_extra,
release_locks, reacquire_locks, locks_thunk);
} }
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn, int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
void (*release_locks)(void*), void(*reacquire_locks)(void*), void *locks_thunk) {
int r; int r;
// panic handled in log_commit // panic handled in log_commit
...@@ -93,7 +97,9 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv ...@@ -93,7 +97,9 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv
txn->progress_poll_fun = poll; txn->progress_poll_fun = poll;
txn->progress_poll_fun_extra = poll_extra; txn->progress_poll_fun_extra = poll_extra;
if (release_locks) release_locks(locks_thunk);
r = toku_log_commit(txn->logger, (LSN*)0, do_fsync, txn->txnid64); // exits holding neither of the tokulogger locks. r = toku_log_commit(txn->logger, (LSN*)0, do_fsync, txn->txnid64); // exits holding neither of the tokulogger locks.
if (reacquire_locks) reacquire_locks(locks_thunk);
if (r!=0) if (r!=0)
return r; return r;
r = toku_rollback_commit(txn, yield, yieldv, oplsn); r = toku_rollback_commit(txn, yield, yieldv, oplsn);
......
...@@ -9,9 +9,11 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log ...@@ -9,9 +9,11 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log
int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger, TXNID xid); int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger, TXNID xid);
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
void (*release_locks)(void*), void(*reacquire_locks)(void*), void *locks_thunk);
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn, int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
void (*release_locks)(void*), void(*reacquire_locks)(void*), void *locks_thunk);
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv, int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
......
...@@ -174,7 +174,6 @@ BDB_BINS = $(patsubst %.c,%.bdb$(BINSUF),$(filter-out $(patsubst %,%.c,$(BDB_DON ...@@ -174,7 +174,6 @@ BDB_BINS = $(patsubst %.c,%.bdb$(BINSUF),$(filter-out $(patsubst %,%.c,$(BDB_DON
endif endif
TDB_TESTS_THAT_SHOULD_FAIL= \ TDB_TESTS_THAT_SHOULD_FAIL= \
test_groupcommit_count \
test944 \ test944 \
test_truncate_txn_abort \ test_truncate_txn_abort \
test_db_no_env test_db_no_env
...@@ -215,6 +214,8 @@ TLRECOVER = 2 3 4 5 6 7 8 9 10 ...@@ -215,6 +214,8 @@ TLRECOVER = 2 3 4 5 6 7 8 9 10
EXTRA_TDB_TESTS = \ EXTRA_TDB_TESTS = \
$(patsubst %,test_log%.recover,$(TLRECOVER)) \ $(patsubst %,test_log%.recover,$(TLRECOVER)) \
test_groupcommit_count_hgrind.tdbrun \
test_groupcommit_count_vgrind.tdbrun \
#\ ends prev line #\ ends prev line
ifeq ($(OS_CHOICE),windows) ifeq ($(OS_CHOICE),windows)
...@@ -602,6 +603,12 @@ helgrind2.bdbrun: BDBVGRIND=$(HGRIND) ...@@ -602,6 +603,12 @@ helgrind2.bdbrun: BDBVGRIND=$(HGRIND)
helgrind3.tdbrun: TDBVGRIND=$(HGRIND) helgrind3.tdbrun: TDBVGRIND=$(HGRIND)
helgrind3.bdbrun: BDBVGRIND=$(HGRIND) helgrind3.bdbrun: BDBVGRIND=$(HGRIND)
test_groupcommit_count_hgrind.tdbrun: HGRIND+=--suppressions=helgrind.suppressions
test_groupcommit_count_hgrind.tdbrun: test_groupcommit_count.tdb$(BINSUF)
$(HGRIND) ./test_groupcommit_count.tdb$(BINSUF) $(VERBVERBOSE) -n 1 -p hgrind $(SUMMARIZE_CMD)
test_groupcommit_count_vgrind.tdbrun: test_groupcommit_count.tdb$(BINSUF)
$(VGRIND) ./test_groupcommit_count.tdb$(BINSUF) $(VERBVERBOSE) -n 1 -p vgrind $(SUMMARIZE_CMD)
# we deliberately don't close the env, so recovery runs # we deliberately don't close the env, so recovery runs
# lets avoid all of the valgrind errors # lets avoid all of the valgrind errors
test-recover1.tdbrun: VGRIND= test-recover1.tdbrun: VGRIND=
......
{
helgrind_3.5.0_false_positive_against_pthread_create
Helgrind:Race
fun:mythread_wrapper
}
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
DB_ENV *env; DB_ENV *env;
DB *db; DB *db;
int do_sync=1;
#define NITER 100 #define NITER 100
...@@ -28,18 +29,20 @@ static void *start_a_thread (void *i_p) { ...@@ -28,18 +29,20 @@ static void *start_a_thread (void *i_p) {
dbt_init(&key, keystr, 1+strlen(keystr)), dbt_init(&key, keystr, 1+strlen(keystr)),
dbt_init(&data, keystr, 1+strlen(keystr)), dbt_init(&data, keystr, 1+strlen(keystr)),
0); 0);
r=tid->commit(tid, 0); CKERR(r); r=tid->commit(tid, do_sync ? 0 : DB_TXN_NOSYNC); CKERR(r);
} }
return 0; return 0;
} }
char *env_path;
static void static void
test_groupcommit (int nthreads) { test_groupcommit (int nthreads) {
int r; int r;
DB_TXN *tid; DB_TXN *tid;
r=db_env_create(&env, 0); assert(r==0); r=db_env_create(&env, 0); assert(r==0);
r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE|DB_THREAD, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r=env->open(env, env_path, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE|DB_THREAD, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r=db_create(&db, env, 0); CKERR(r); r=db_create(&db, env, 0); CKERR(r);
r=env->txn_begin(env, 0, &tid, 0); assert(r==0); r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
r=db->open(db, tid, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r=db->open(db, tid, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
...@@ -59,6 +62,7 @@ test_groupcommit (int nthreads) { ...@@ -59,6 +62,7 @@ test_groupcommit (int nthreads) {
r=db->close(db, 0); assert(r==0); r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0); r=env->close(env, 0); assert(r==0);
//if (verbose) printf(" That's a total of %d commits\n", nthreads*NITER);
} }
// helgrind doesn't understand that pthread_join removes a race condition. I'm not impressed... -Bradley // helgrind doesn't understand that pthread_join removes a race condition. I'm not impressed... -Bradley
...@@ -88,42 +92,102 @@ static struct timeval prevtime; ...@@ -88,42 +92,102 @@ static struct timeval prevtime;
static int prev_count; static int prev_count;
static void static void
printtdiff (char *str) { printtdiff (int N) {
struct timeval thistime; struct timeval thistime;
gettimeofday(&thistime, 0); gettimeofday(&thistime, 0);
double diff = toku_tdiff(&thistime, &prevtime); double diff = toku_tdiff(&thistime, &prevtime);
int fcount=get_fsync_count(); int fcount=get_fsync_count();
if (verbose) printf("%s: %10.6fs %d fsyncs for %s\n", progname, diff, fcount-prev_count, str); if (verbose) printf("%s: %10.6fs %4d fsyncs for %4d threads %s %8.1f tps, %8.1f tps/thread\n", progname, diff, fcount-prev_count,
N,
do_sync ? "with sync " : "with DB_TXN_NOSYNC",
NITER*(N/diff), NITER/diff);
prevtime=thistime; prevtime=thistime;
prev_count=fcount; prev_count=fcount;
} }
static void
do_test (int N) {
for (do_sync = 0; do_sync<2; do_sync++) {
int count_before = get_fsync_count();
test_groupcommit(N);
printtdiff(N);
if (get_fsync_count()-count_before>= N*NITER) {
if (verbose) printf("It looks like too many fsyncs. Group commit doesn't appear to be occuring.\n");
exit(1);
}
}
}
int log_max_n_threads_over_10 = 3;
static void
my_parse_args (int argc, char *argv[]) {
verbose=1; // use -q to turn off the talking.
env_path = toku_strdup(ENVDIR);
const char *argv0=argv[0];
while (argc>1) {
int resultcode=0;
if (strcmp(argv[1], "-v")==0) {
verbose++;
} else if (strcmp(argv[1],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[1],"-n")==0) {
argc--;
argv++;
if (argc<=1) { resultcode=1; goto do_usage; }
errno = 0;
char *end;
log_max_n_threads_over_10 = strtol(argv[1], &end, 10);
if (errno!=0 || *end) {
resultcode=1;
goto do_usage;
}
} else if (strcmp(argv[1],"-p")==0) {
argc--;
argv++;
if (argc<=1) { resultcode=1; goto do_usage; }
int size = strlen(ENVDIR) + 10 + strlen(argv[1]);
REALLOC_N(size, env_path);
assert(env_path);
snprintf(env_path, size, "%s.%s", ENVDIR, argv[1]);
} else if (strcmp(argv[1], "-h")==0) {
do_usage:
fprintf(stderr, "Usage:\n%s [-v|-q] [-n LOG(MAX_N_THREADS/10)] [-h]\n", argv0);
exit(resultcode);
} else {
resultcode=1;
goto do_usage;
}
argc--;
argv++;
}
}
int int
test_main (int argc, char *const argv[]) { test_main (int argc, char *const argv[]) {
progname=argv[0]; progname=argv[0];
parse_args(argc, argv); my_parse_args(argc, argv);
gettimeofday(&prevtime, 0); gettimeofday(&prevtime, 0);
prev_count=0; prev_count=0;
{ int r = db_env_set_func_fsync(do_fsync); CKERR(r); } { int r = db_env_set_func_fsync(do_fsync); CKERR(r); }
system("rm -rf " ENVDIR); {
{ int r=toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0); } int size=20+strlen(env_path);
char command[size];
test_groupcommit(1); printtdiff("1 thread"); snprintf(command, size, "rm -rf %s", env_path);
test_groupcommit(2); printtdiff("2 threads"); system(command);
int count_before_10 = get_fsync_count();
test_groupcommit(10); printtdiff("10 threads");
if (get_fsync_count()-count_before_10 >= 10*NITER) {
if (verbose) printf("It looks like too many fsyncs. Group commit doesn't appear to be occuring.\n");
exit(1);
} }
int count_before_20 = get_fsync_count(); { int r=toku_os_mkdir(env_path, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0); }
test_groupcommit(20); printtdiff("20 threads");
if (get_fsync_count()-count_before_20 >= 20*NITER) { test_groupcommit(1); printtdiff(1);
if (verbose) printf("It looks like too many fsyncs. Group commit doesn't appear to be occuring.\n"); test_groupcommit(2); printtdiff(2);
exit(1); for (int i=0; i<log_max_n_threads_over_10; i++) {
do_test(10 << i);
} }
toku_free(env_path);
return 0; return 0;
} }
...@@ -91,6 +91,7 @@ printtdiff (char *str) { ...@@ -91,6 +91,7 @@ printtdiff (char *str) {
struct timeval thistime; struct timeval thistime;
gettimeofday(&thistime, 0); gettimeofday(&thistime, 0);
if (verbose) printf("%10.6f %s\n", toku_tdiff(&thistime, &prevtime), str); if (verbose) printf("%10.6f %s\n", toku_tdiff(&thistime, &prevtime), str);
prevtime = thistime;
} }
int int
......
...@@ -1323,8 +1323,6 @@ env_get_engine_status(DB_ENV * env, ENGINE_STATUS * engstat) { ...@@ -1323,8 +1323,6 @@ env_get_engine_status(DB_ENV * env, ENGINE_STATUS * engstat) {
time_t now = time(NULL); time_t now = time(NULL);
format_time(&now, engstat->now); format_time(&now, engstat->now);
engstat->logger_lock_ctr = toku_logger_get_lock_ctr();
{ {
SCHEDULE_STATUS_S schedstat; SCHEDULE_STATUS_S schedstat;
toku_ydb_lock_get_status(&schedstat); toku_ydb_lock_get_status(&schedstat);
...@@ -1418,7 +1416,6 @@ env_get_engine_status_text(DB_ENV * env, char * buff, int bufsiz) { ...@@ -1418,7 +1416,6 @@ env_get_engine_status_text(DB_ENV * env, char * buff, int bufsiz) {
n += snprintf(buff + n, bufsiz - n, "time_ydb_lock_held_unavailable %"PRIu64"\n", engstat.time_ydb_lock_held_unavailable); n += snprintf(buff + n, bufsiz - n, "time_ydb_lock_held_unavailable %"PRIu64"\n", engstat.time_ydb_lock_held_unavailable);
n += snprintf(buff + n, bufsiz - n, "max_time_ydb_lock_held %"PRIu64"\n", engstat.max_time_ydb_lock_held); n += snprintf(buff + n, bufsiz - n, "max_time_ydb_lock_held %"PRIu64"\n", engstat.max_time_ydb_lock_held);
n += snprintf(buff + n, bufsiz - n, "total_time_ydb_lock_held %"PRIu64"\n", engstat.total_time_ydb_lock_held); n += snprintf(buff + n, bufsiz - n, "total_time_ydb_lock_held %"PRIu64"\n", engstat.total_time_ydb_lock_held);
n += snprintf(buff + n, bufsiz - n, "logger_lock_ctr %"PRIu64"\n", engstat.logger_lock_ctr);
n += snprintf(buff + n, bufsiz - n, "checkpoint_period %d \n", engstat.checkpoint_period); n += snprintf(buff + n, bufsiz - n, "checkpoint_period %d \n", engstat.checkpoint_period);
n += snprintf(buff + n, bufsiz - n, "checkpoint_footprint %d \n", engstat.checkpoint_footprint); n += snprintf(buff + n, bufsiz - n, "checkpoint_footprint %d \n", engstat.checkpoint_footprint);
n += snprintf(buff + n, bufsiz - n, "checkpoint_time_begin %s \n", engstat.checkpoint_time_begin); n += snprintf(buff + n, bufsiz - n, "checkpoint_time_begin %s \n", engstat.checkpoint_time_begin);
...@@ -1608,6 +1605,15 @@ static void ydb_yield (voidfp f, void *UU(v)) { ...@@ -1608,6 +1605,15 @@ static void ydb_yield (voidfp f, void *UU(v)) {
toku_ydb_lock(); toku_ydb_lock();
} }
static void release_ydb_lock_callback (void *ignore __attribute__((__unused__))) {
//printf("%8.6fs Thread %ld release\n", get_tdiff(), pthread_self());
toku_ydb_unlock();
}
static void reacquire_ydb_lock_callback (void *ignore __attribute__((__unused__))) {
//printf("%8.6fs Thread %ld reacquire\n", get_tdiff(), pthread_self());
toku_ydb_lock();
}
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags, static int toku_txn_commit(DB_TXN * txn, u_int32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
if (!txn) return EINVAL; if (!txn) return EINVAL;
...@@ -1648,7 +1654,9 @@ static int toku_txn_commit(DB_TXN * txn, u_int32_t flags, ...@@ -1648,7 +1654,9 @@ static int toku_txn_commit(DB_TXN * txn, u_int32_t flags,
// frees the tokutxn // frees the tokutxn
// Calls ydb_yield(NULL) occasionally // Calls ydb_yield(NULL) occasionally
//r = toku_logger_commit(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL); //r = toku_logger_commit(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL);
r = toku_txn_commit_txn(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL, poll, poll_extra); r = toku_txn_commit_txn(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL,
poll, poll_extra,
release_ydb_lock_callback, reacquire_ydb_lock_callback, NULL);
if (r!=0 && !toku_env_is_panicked(txn->mgrp)) { if (r!=0 && !toku_env_is_panicked(txn->mgrp)) {
txn->mgrp->i->is_panicked = r; txn->mgrp->i->is_panicked = r;
...@@ -5292,8 +5300,10 @@ include_toku_pthread_yield (void) { ...@@ -5292,8 +5300,10 @@ include_toku_pthread_yield (void) {
// For test purposes only, translate dname to iname // For test purposes only, translate dname to iname
static int static int
env_get_iname(DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) { env_get_iname(DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) {
toku_ydb_lock();
DB *directory = env->i->directory; DB *directory = env->i->directory;
int r = autotxn_db_get(directory, NULL, dname_dbt, iname_dbt, DB_PRELOCKED); // allocates memory for iname int r = autotxn_db_get(directory, NULL, dname_dbt, iname_dbt, DB_PRELOCKED); // allocates memory for iname
toku_ydb_unlock();
return r; return r;
} }
......
...@@ -139,6 +139,7 @@ void toku_os_full_write (int fd, const void *buf, size_t len) __attribute__((__v ...@@ -139,6 +139,7 @@ void toku_os_full_write (int fd, const void *buf, size_t len) __attribute__((__v
int toku_os_write (int fd, const void *buf, size_t len) __attribute__((__visibility__("default"))); int toku_os_write (int fd, const void *buf, size_t len) __attribute__((__visibility__("default")));
// wrapper around fsync // wrapper around fsync
int toku_file_fsync_without_accounting(int fd);
int toku_file_fsync(int fd); int toku_file_fsync(int fd);
// get the number of fsync calls and the fsync times // get the number of fsync calls and the fsync times
......
...@@ -249,9 +249,8 @@ static uint64_t get_tnow(void) { ...@@ -249,9 +249,8 @@ static uint64_t get_tnow(void) {
// keep trying if fsync fails because of EINTR // keep trying if fsync fails because of EINTR
int int
toku_file_fsync(int fd) { toku_file_fsync_without_accounting (int fd) {
int r = -1; int r = -1;
uint64_t tstart = get_tnow();
while (r != 0) { while (r != 0) {
if (t_fsync) if (t_fsync)
r = t_fsync(fd); r = t_fsync(fd);
...@@ -260,6 +259,13 @@ toku_file_fsync(int fd) { ...@@ -260,6 +259,13 @@ toku_file_fsync(int fd) {
if (r) if (r)
assert(errno==EINTR); assert(errno==EINTR);
} }
return r;
}
int
toku_file_fsync(int fd) {
uint64_t tstart = get_tnow();
int r = toku_file_fsync_without_accounting(fd);
#if TOKU_WINDOWS_HAS_ATOMIC_64 #if TOKU_WINDOWS_HAS_ATOMIC_64
toku_sync_fetch_and_increment_uint64(&toku_fsync_count); toku_sync_fetch_and_increment_uint64(&toku_fsync_count);
toku_sync_fetch_and_add_uint64(&toku_fsync_time, get_tnow() - tstart); toku_sync_fetch_and_add_uint64(&toku_fsync_time, get_tnow() - tstart);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment