Commit 7e81e8be authored by Dave Wells's avatar Dave Wells Committed by Yoni Fogel

beginnings of logic to trim log files if possible. No actual trimming done yet

git-svn-id: file:///svn/toku/tokudb@14380 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0860425e
...@@ -52,6 +52,7 @@ BRT_SOURCES = \ ...@@ -52,6 +52,7 @@ BRT_SOURCES = \
key \ key \
leafentry \ leafentry \
leaflock \ leaflock \
logfilemgr \
logger \ logger \
log_code \ log_code \
logcursor \ logcursor \
......
...@@ -45,6 +45,7 @@ ...@@ -45,6 +45,7 @@
#include "log-internal.h" #include "log-internal.h"
#include "log_header.h" #include "log_header.h"
#include "logcursor.h" #include "logcursor.h"
#include "logfilemgr.h"
#include "mempool.h" #include "mempool.h"
#include "rbuf.h" #include "rbuf.h"
#include "threadpool.h" #include "threadpool.h"
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include "toku_assert.h" #include "toku_assert.h"
#include "list.h" #include "list.h"
#include "memarena.h" #include "memarena.h"
#include "logfilemgr.h"
#include <stdio.h> #include <stdio.h>
#include <toku_pthread.h> #include <toku_pthread.h>
#include <sys/types.h> #include <sys/types.h>
...@@ -76,6 +77,8 @@ struct tokulogger { ...@@ -76,6 +77,8 @@ struct tokulogger {
char buf[LOGGER_BUF_SIZE]; // used to marshall logbytes so we can use only a single write char buf[LOGGER_BUF_SIZE]; // used to marshall logbytes so we can use only a single write
int n_in_file; int n_in_file;
TOKULOGFILEMGR logfilemgr;
u_int32_t write_block_size; // How big should the blocks be written to various logs? u_int32_t write_block_size; // How big should the blocks be written to various logs?
TXNID oldest_living_xid; TXNID oldest_living_xid;
}; };
......
...@@ -81,7 +81,8 @@ static int lc_check_lsn(TOKULOGCURSOR lc, int dir) { ...@@ -81,7 +81,8 @@ static int lc_check_lsn(TOKULOGCURSOR lc, int dir) {
// toku_logcursor_create() // toku_logcursor_create()
// - returns a pointer to a logcursor // - returns a pointer to a logcursor
int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) {
static int lc_create(TOKULOGCURSOR *lc, const char *log_dir) {
int failresult=0; int failresult=0;
int r=0; int r=0;
...@@ -89,7 +90,7 @@ int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) { ...@@ -89,7 +90,7 @@ int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) {
TOKULOGCURSOR cursor = (TOKULOGCURSOR) toku_malloc(sizeof(struct toku_logcursor)); TOKULOGCURSOR cursor = (TOKULOGCURSOR) toku_malloc(sizeof(struct toku_logcursor));
if ( cursor == NULL ) { if ( cursor == NULL ) {
failresult = ENOMEM; failresult = ENOMEM;
goto fail; goto lc_create_fail;
} }
// find logfiles in logdir // find logfiles in logdir
cursor->is_open = FALSE; cursor->is_open = FALSE;
...@@ -100,52 +101,58 @@ int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) { ...@@ -100,52 +101,58 @@ int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) {
cursor->logdir = (char *) toku_malloc(strlen(log_dir)+1); cursor->logdir = (char *) toku_malloc(strlen(log_dir)+1);
if ( cursor->logdir == NULL ) { if ( cursor->logdir == NULL ) {
failresult = ENOMEM; failresult = ENOMEM;
goto fail; goto lc_create_fail;
} }
sprintf(cursor->logdir, "%s", log_dir); sprintf(cursor->logdir, "%s", log_dir);
} else { } else {
char *cwd = getcwd(NULL, 0); char *cwd = getcwd(NULL, 0);
if ( cwd == NULL ) { if ( cwd == NULL ) {
failresult = -1; failresult = -1;
goto fail; goto lc_create_fail;
} }
cursor->logdir = (char *) toku_malloc(strlen(cwd)+strlen(log_dir)+2); cursor->logdir = (char *) toku_malloc(strlen(cwd)+strlen(log_dir)+2);
if ( cursor->logdir == NULL ) { if ( cursor->logdir == NULL ) {
toku_free(cwd); toku_free(cwd);
failresult = ENOMEM; failresult = ENOMEM;
goto fail; goto lc_create_fail;
} }
sprintf(cursor->logdir, "%s/%s", cwd, log_dir); sprintf(cursor->logdir, "%s/%s", cwd, log_dir);
toku_free(cwd); toku_free(cwd);
} }
cursor->logfiles = NULL; cursor->logfiles = NULL;
cursor->n_logfiles = 0; cursor->n_logfiles = 0;
r = toku_logger_find_logfiles(cursor->logdir, &(cursor->logfiles), &(cursor->n_logfiles));
if (r!=0) {
failresult=r;
goto fail;
}
cursor->cur_lsn.lsn=0; cursor->cur_lsn.lsn=0;
cursor->last_direction=LC_FIRST; cursor->last_direction=LC_FIRST;
*lc = cursor; *lc = cursor;
return r; return r;
fail:
lc_create_fail:
toku_logcursor_destroy(&cursor); toku_logcursor_destroy(&cursor);
*lc = NULL; *lc = NULL;
return failresult; return failresult;
} }
int toku_logcursor_create_for_file(TOKULOGCURSOR *lc, const char *log_dir, const char *log_file) { int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) {
int r=0; int r = lc_create(lc, log_dir);
int failresult=0; if ( r!=0 )
return r;
TOKULOGCURSOR cursor; TOKULOGCURSOR cursor = *lc;
r = toku_logcursor_create(&cursor, log_dir); r = toku_logger_find_logfiles(cursor->logdir, &(cursor->logfiles), &(cursor->n_logfiles));
if (r!=0) if (r!=0) {
toku_logcursor_destroy(&cursor);
*lc = NULL;
}
return r;
}
int toku_logcursor_create_for_file(TOKULOGCURSOR *lc, const char *log_dir, const char *log_file) {
int failresult = 0;
int r = lc_create(lc, log_dir);
if ( r!=0 )
return r; return r;
int idx; TOKULOGCURSOR cursor = *lc;
int found_it=0;
int fullnamelen = strlen(cursor->logdir) + strlen(log_file) + 3; int fullnamelen = strlen(cursor->logdir) + strlen(log_file) + 3;
char *log_file_fullname = toku_malloc(fullnamelen); char *log_file_fullname = toku_malloc(fullnamelen);
if ( log_file_fullname == NULL ) { if ( log_file_fullname == NULL ) {
...@@ -153,22 +160,15 @@ int toku_logcursor_create_for_file(TOKULOGCURSOR *lc, const char *log_dir, const ...@@ -153,22 +160,15 @@ int toku_logcursor_create_for_file(TOKULOGCURSOR *lc, const char *log_dir, const
goto fail; goto fail;
} }
sprintf(log_file_fullname, "%s/%s", cursor->logdir, log_file); sprintf(log_file_fullname, "%s/%s", cursor->logdir, log_file);
for(idx=0;idx<cursor->n_logfiles;idx++) {
if ( strcmp(log_file_fullname, cursor->logfiles[idx]) == 0 ) { cursor->n_logfiles=1;
found_it = 1;
break; char **logfiles = toku_malloc(sizeof(char**));
} if ( logfiles == NULL ) {
} failresult = ENOMEM;
if (found_it==0) {
failresult = DB_NOTFOUND;
goto fail; goto fail;
} }
// replace old logfile structure with single file version cursor->logfiles = logfiles;
int lf;
for(lf=0;lf<cursor->n_logfiles;lf++) {
toku_free(cursor->logfiles[lf]);
}
cursor->n_logfiles=1;
cursor->logfiles[0] = log_file_fullname; cursor->logfiles[0] = log_file_fullname;
*lc = cursor; *lc = cursor;
return r; return r;
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id: logcursor.c 13196 2009-07-10 14:41:51Z zardosht $"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
// for now, implement with singlely-linked-list
// first = oldest (delete from beginning)
// last = newest (add to end)
struct lfm_entry {
TOKULOGFILEINFO lf_info;
struct lfm_entry *next;
};
struct toku_logfilemgr {
struct lfm_entry *first;
struct lfm_entry *last;
int n_entries;
};
int toku_logfilemgr_create(TOKULOGFILEMGR *lfm) {
int failresult=0;
// malloc a logfilemgr
TOKULOGFILEMGR mgr = toku_malloc(sizeof(struct toku_logfilemgr));
// printf("toku_logfilemgr_create [%p]\n", mgr);
if ( mgr == NULL ) {
failresult = ENOMEM;
goto createfail;
}
mgr->first = NULL;
mgr->last = NULL;
mgr->n_entries = 0;
*lfm = mgr;
return 0;
createfail:
toku_logfilemgr_destroy(&mgr);
*lfm = NULL;
return failresult;
}
int toku_logfilemgr_destroy(TOKULOGFILEMGR *lfm) {
int r=0;
if ( *lfm != NULL ) { // be tolerant of being passed a NULL
TOKULOGFILEMGR mgr = *lfm;
// printf("toku_logfilemgr_destroy [%p], %d entries\n", mgr, mgr->n_entries);
while ( mgr->n_entries > 0 ) {
toku_logfilemgr_delete_oldest_logfile_info(mgr);
}
toku_free(*lfm);
*lfm = NULL;
}
return r;
}
int toku_logfilemgr_init(TOKULOGFILEMGR lfm, const char *log_dir) {
// printf("toku_logfilemgr_init [%p]\n", lfm);
assert(lfm!=0);
int r;
int n_logfiles;
char **logfiles;
r = toku_logger_find_logfiles(log_dir, &logfiles, &n_logfiles);
if (r!=0)
return r;
TOKULOGCURSOR cursor;
struct log_entry *entry;
TOKULOGFILEINFO lf_info;
long long index = -1;
char *basename;
for(int i=0;i<n_logfiles;i++){
lf_info = toku_malloc(sizeof(struct toku_logfile_info));
if ( lf_info == NULL ) {
return ENOMEM;
}
// find the index
basename = strrchr(logfiles[i], '/') + 1;
r = sscanf(basename, "log%lld.tokulog", &index);
assert(r==1); // found index
lf_info->index = index;
// find last LSN
r = toku_logcursor_create_for_file(&cursor, log_dir, basename);
toku_logcursor_last(cursor, &entry);
lf_info->maxlsn = toku_log_entry_get_lsn(entry);
// add to logfilemgr
toku_logfilemgr_add_logfile_info(lfm, lf_info);
toku_logcursor_destroy(&cursor);
}
for(int i=0;i<n_logfiles;i++) {
toku_free(logfiles[i]);
}
toku_free(logfiles);
return 0;
}
int toku_logfilemgr_num_logfiles(TOKULOGFILEMGR lfm) {
assert(lfm!=NULL);
return lfm->n_entries;
}
int toku_logfilemgr_add_logfile_info(TOKULOGFILEMGR lfm, TOKULOGFILEINFO lf_info) {
assert(lfm!=NULL);
struct lfm_entry *entry = toku_malloc(sizeof(struct lfm_entry));
// printf("toku_logfilemgr_add_logfile_info [%p] : entry [%p]\n", lfm, entry);
int r=0;
if ( entry != NULL ) {
entry->lf_info=lf_info;
entry->next = NULL;
if ( lfm->n_entries != 0 )
lfm->last->next = entry;
lfm->last = entry;
lfm->n_entries++;
if (lfm->n_entries == 1 ) {
lfm->first = lfm->last;
}
}
else
r = ENOMEM;
return r;
}
TOKULOGFILEINFO toku_logfilemgr_get_oldest_logfile_info(TOKULOGFILEMGR lfm) {
assert(lfm!=NULL);
return lfm->first->lf_info;
}
void toku_logfilemgr_delete_oldest_logfile_info(TOKULOGFILEMGR lfm) {
assert(lfm!=NULL);
if ( lfm->n_entries > 0 ) {
struct lfm_entry *entry = lfm->first;
// printf("toku_logfilemgr_delete_oldest_logfile_info [%p] : entry [%p]\n", lfm, entry);
toku_free(entry->lf_info);
lfm->first = entry->next;
toku_free(entry);
lfm->n_entries--;
if ( lfm->n_entries == 0 ) {
lfm->last = lfm->first = NULL;
}
}
}
LSN toku_logfilemgr_get_last_lsn(TOKULOGFILEMGR lfm) {
assert(lfm!=NULL);
if ( lfm->n_entries == 0 ) {
LSN lsn;
lsn.lsn = 0;
return lsn;
}
return lfm->last->lf_info->maxlsn;
}
void toku_logfilemgr_update_last_lsn(TOKULOGFILEMGR lfm, LSN lsn) {
assert(lfm!=NULL);
assert(lfm->last!=NULL);
lfm->last->lf_info->maxlsn = lsn;
}
void toku_logfilemgr_print(TOKULOGFILEMGR lfm) {
assert(lfm!=NULL);
printf("toku_logfilemgr_print [%p] : %d entries \n", lfm, lfm->n_entries);
int i;
struct lfm_entry *entry = lfm->first;
for (i=0;i<lfm->n_entries;i++) {
printf(" entry %d : index = %lld, maxlsn = %lu\n", i, entry->lf_info->index, entry->lf_info->maxlsn.lsn);
entry = entry->next;
}
}
#ifndef TOKULOGFILEMGR_H
#define TOKULOGFILEMGR_H
#ident "$Id: logfilemgr.h 12375 2009-05-28 14:14:47Z wells $"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "log_header.h"
// this is the basic information we need to keep per logfile
struct toku_logfile_info {
long long index;
LSN maxlsn;
};
typedef struct toku_logfile_info *TOKULOGFILEINFO;
struct toku_logfilemgr;
typedef struct toku_logfilemgr *TOKULOGFILEMGR;
int toku_logfilemgr_create(TOKULOGFILEMGR *lfm);
int toku_logfilemgr_destroy(TOKULOGFILEMGR *lfm);
int toku_logfilemgr_init(TOKULOGFILEMGR lfm, const char *log_dir);
int toku_logfilemgr_num_logfiles(TOKULOGFILEMGR lfm);
int toku_logfilemgr_add_logfile_info(TOKULOGFILEMGR lfm, TOKULOGFILEINFO lf_info);
TOKULOGFILEINFO toku_logfilemgr_get_oldest_logfile_info(TOKULOGFILEMGR lfm);
void toku_logfilemgr_delete_oldest_logfile_info(TOKULOGFILEMGR lfm);
LSN toku_logfilemgr_get_last_lsn(TOKULOGFILEMGR lfm);
void toku_logfilemgr_update_last_lsn(TOKULOGFILEMGR lfm, LSN lsn);
void toku_logfilemgr_print(TOKULOGFILEMGR lfm);
#endif //TOKULOGFILEMGR_H
...@@ -11,6 +11,7 @@ static toku_pthread_mutex_t logger_mutex = TOKU_PTHREAD_MUTEX_INITIALIZER; ...@@ -11,6 +11,7 @@ static toku_pthread_mutex_t logger_mutex = TOKU_PTHREAD_MUTEX_INITIALIZER;
static int (*toku_os_fsync_function)(int)=fsync; static int (*toku_os_fsync_function)(int)=fsync;
static int open_logfile (TOKULOGGER logger); static int open_logfile (TOKULOGGER logger);
static int delete_logfile(TOKULOGGER logger, long long index);
static int do_write (TOKULOGGER logger, int do_fsync); static int do_write (TOKULOGGER logger, int do_fsync);
int toku_logger_create (TOKULOGGER *resultp) { int toku_logger_create (TOKULOGGER *resultp) {
...@@ -30,6 +31,7 @@ int toku_logger_create (TOKULOGGER *resultp) { ...@@ -30,6 +31,7 @@ int toku_logger_create (TOKULOGGER *resultp) {
result->checkpoint_lsn=(LSN){0}; result->checkpoint_lsn=(LSN){0};
result->oldest_living_xid = TXNID_NONE_LIVING; result->oldest_living_xid = TXNID_NONE_LIVING;
result->write_block_size = BRT_DEFAULT_NODE_SIZE; // default logging size is the same as the default brt block size result->write_block_size = BRT_DEFAULT_NODE_SIZE; // default logging size is the same as the default brt block size
toku_logfilemgr_create(&result->logfilemgr);
*resultp=result; *resultp=result;
r = ml_init(&result->input_lock); if (r!=0) goto died1; r = ml_init(&result->input_lock); if (r!=0) goto died1;
r = ml_init(&result->output_lock); if (r!=0) goto died2; r = ml_init(&result->output_lock); if (r!=0) goto died2;
...@@ -49,19 +51,10 @@ int toku_logger_open (const char *directory, TOKULOGGER logger) { ...@@ -49,19 +51,10 @@ int toku_logger_open (const char *directory, TOKULOGGER logger) {
if (logger->is_panicked) return EINVAL; if (logger->is_panicked) return EINVAL;
int r; int r;
TOKULOGCURSOR logcursor; r = toku_logfilemgr_init(logger->logfilemgr, directory);
struct log_entry *le; if ( r!=0 )
logger->lsn.lsn=0; return r;
// In searching existing logfiles for last LSN, have chosen to logger->lsn = toku_logfilemgr_get_last_lsn(logger->logfilemgr);
// ignore errors that may occur. In the event of a return error,
// revert to using LSN=0
r = toku_logcursor_create(&logcursor, directory);
if (r==0) {
r = toku_logcursor_last(logcursor, &le);
if (r==0)
logger->lsn = toku_log_entry_get_lsn(le);
toku_logcursor_destroy(&logcursor);
}
//printf("starting after LSN=%lu\n", logger->lsn.lsn); //printf("starting after LSN=%lu\n", logger->lsn.lsn);
logger->written_lsn = logger->lsn; logger->written_lsn = logger->lsn;
logger->fsynced_lsn = logger->lsn; logger->fsynced_lsn = logger->lsn;
...@@ -87,7 +80,6 @@ int toku_logger_open (const char *directory, TOKULOGGER logger) { ...@@ -87,7 +80,6 @@ int toku_logger_open (const char *directory, TOKULOGGER logger) {
open_logfile(logger); open_logfile(logger);
logger->is_open = 1; logger->is_open = 1;
return 0; return 0;
} }
...@@ -160,6 +152,7 @@ int toku_logger_close(TOKULOGGER *loggerp) { ...@@ -160,6 +152,7 @@ int toku_logger_close(TOKULOGGER *loggerp) {
logger->is_panicked=1; // Just in case this might help. logger->is_panicked=1; // Just in case this might help.
if (logger->directory) toku_free(logger->directory); if (logger->directory) toku_free(logger->directory);
toku_omt_destroy(&logger->live_txns); toku_omt_destroy(&logger->live_txns);
toku_logfilemgr_destroy(&logger->logfilemgr);
toku_free(logger); toku_free(logger);
*loggerp=0; *loggerp=0;
if (locked_logger) { if (locked_logger) {
...@@ -344,6 +337,7 @@ static int open_logfile (TOKULOGGER logger) { ...@@ -344,6 +337,7 @@ static int open_logfile (TOKULOGGER logger) {
int fnamelen = strlen(logger->directory)+50; int fnamelen = strlen(logger->directory)+50;
char fname[fnamelen]; char fname[fnamelen];
snprintf(fname, fnamelen, "%s/log%012lld.tokulog", logger->directory, logger->next_log_file_number); snprintf(fname, fnamelen, "%s/log%012lld.tokulog", logger->directory, logger->next_log_file_number);
long long index = logger->next_log_file_number;
if (logger->write_log_files) { if (logger->write_log_files) {
logger->fd = open(fname, O_CREAT+O_WRONLY+O_TRUNC+O_EXCL+O_BINARY, S_IRWXU); logger->fd = open(fname, O_CREAT+O_WRONLY+O_TRUNC+O_EXCL+O_BINARY, S_IRWXU);
if (logger->fd==-1) return errno; if (logger->fd==-1) return errno;
...@@ -356,11 +350,49 @@ static int open_logfile (TOKULOGGER logger) { ...@@ -356,11 +350,49 @@ static int open_logfile (TOKULOGGER logger) {
r = write_it(logger->fd, "tokulogg", 8); if (r!=8) return errno; r = write_it(logger->fd, "tokulogg", 8); if (r!=8) return errno;
int version_l = toku_htonl(log_format_version); //version MUST be in network byte order regardless of disk order int version_l = toku_htonl(log_format_version); //version MUST be in network byte order regardless of disk order
r = write_it(logger->fd, &version_l, 4); if (r!=4) return errno; r = write_it(logger->fd, &version_l, 4); if (r!=4) return errno;
TOKULOGFILEINFO lf_info = toku_malloc(sizeof(struct toku_logfile_info));
if (lf_info == NULL)
return ENOMEM;
lf_info->index = index;
lf_info->maxlsn = logger->written_lsn; // ?? not sure this is right, but better than 0 - DSW
toku_logfilemgr_add_logfile_info(logger->logfilemgr, lf_info);
logger->fsynced_lsn = logger->written_lsn; logger->fsynced_lsn = logger->written_lsn;
logger->n_in_file = 12; logger->n_in_file = 12;
return 0; return 0;
} }
static int delete_logfile(TOKULOGGER logger, long long index) {
int fnamelen = strlen(logger->directory)+50;
char fname[fnamelen];
snprintf(fname, fnamelen, "%s/log%012lld.tokulog", logger->directory, index);
int r = remove(fname);
return r;
}
int toku_logger_maybe_trim_log(TOKULOGGER logger, LSN oldest_open_lsn) {
int r=0;
TOKULOGFILEMGR lfm = logger->logfilemgr;
int n_logfiles = toku_logfilemgr_num_logfiles(lfm);
TOKULOGFILEINFO lf_info = NULL;
while ( n_logfiles > 1 ) { // don't delete current logfile
lf_info = toku_logfilemgr_get_oldest_logfile_info(lfm);
if ( lf_info->maxlsn.lsn > oldest_open_lsn.lsn ) {
// file contains an open LSN, can't delete this or any newer log files
break;
}
// need to save copy - toku_logfilemgr_delete_oldest_logfile_info free's the lf_info
long long index = lf_info->index;
toku_logfilemgr_delete_oldest_logfile_info(lfm);
n_logfiles--;
r = delete_logfile(logger, index);
if (r!=0) {
return r;
}
}
return r;
}
static int close_and_open_logfile (TOKULOGGER logger) { static int close_and_open_logfile (TOKULOGGER logger) {
int r; int r;
if (logger->write_log_files) { if (logger->write_log_files) {
...@@ -426,6 +458,7 @@ static int do_write (TOKULOGGER logger, int do_fsync) { ...@@ -426,6 +458,7 @@ static int do_write (TOKULOGGER logger, int do_fsync) {
} }
logger->fsynced_lsn = logger->written_lsn; logger->fsynced_lsn = logger->written_lsn;
} }
toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn);
return 0; return 0;
panic: panic:
toku_logger_panic(logger, r); toku_logger_panic(logger, r);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment