Commit 00bdfaff authored by Zardosht Kasheff's avatar Zardosht Kasheff

refs #61, several code simplifications:

 - break up cachetable_flush_cachefile into more digestable functions,
 - decouple hash_id from filenum
 - break up close_userdata into close_userdata and free_userdata
parent 9aec732c
......@@ -184,7 +184,7 @@ struct cachefile {
// they are managed whenever we add or remove a pair from
// the cachetable. As of Riddler, this linked list is only used to
// make cachetable_flush_cachefile more efficient
PAIR cf_head;
PAIR cf_head; // doubly linked list that is NOT circular
uint32_t num_pairs; // count on number of pairs in the cachetable belong to this cachefile
bool for_checkpoint; //True if part of the in-progress checkpoint
......@@ -209,6 +209,7 @@ struct cachefile {
void *userdata;
void (*log_fassociate_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log all open files.
void (*close_userdata)(CACHEFILE cf, int fd, void *userdata, bool lsnvalid, LSN); // when closing the last reference to a cachefile, first call this function.
void (*free_userdata)(CACHEFILE cf, void *userdata); // when closing the last reference to a cachefile, first call this function.
void (*begin_checkpoint_userdata)(LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function.
void (*checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // when checkpointing a cachefile, call this function.
void (*end_checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // after checkpointing cachefiles call this function.
......@@ -427,11 +428,13 @@ public:
void add_cf_unlocked(CACHEFILE newcf);
void remove_cf(CACHEFILE cf);
FILENUM reserve_filenum();
uint32_t get_new_hash_id_unlocked();
CACHEFILE find_cachefile_unlocked(struct fileid* fileid);
void verify_unused_filenum(FILENUM filenum);
// access to these fields are protected by the lock
CACHEFILE m_head;
CACHEFILE m_active_head; // head of CACHEFILEs that are active
FILENUM m_next_filenum_to_use;
uint32_t m_next_hash_id_to_use;
toku_pthread_rwlock_t m_lock; // this field is publoc so we are still POD
};
......
......@@ -378,10 +378,11 @@ static void create_new_cachefile(
CACHEFILE newcf = NULL;
XCALLOC(newcf);
newcf->cachetable = ct;
newcf->filenum = filenum;
newcf->hash_id = hash_id;
newcf->fd = fd;
newcf->fileid = fileid;
newcf->filenum = filenum;
newcf->fd = fd;
newcf->fname_in_env = toku_xstrdup(fname_in_env);
bjm_init(&newcf->bjm);
*cfptr = newcf;
......@@ -413,8 +414,15 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
goto exit;
}
ct->cf_list.verify_unused_filenum(filenum);
create_new_cachefile(ct, filenum, filenum.fileid, fd, fname_in_env, fileid, &newcf);
create_new_cachefile(
ct,
filenum,
ct->cf_list.get_new_hash_id_unlocked(),
fd,
fname_in_env,
fileid,
&newcf
);
ct->cf_list.add_cf_unlocked(newcf);
......@@ -468,7 +476,9 @@ void toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) {
// Call the close userdata callback to notify the client this cachefile
// and its underlying file are going to be closed
if (cf->close_userdata) {
invariant(cf->free_userdata);
cf->close_userdata(cf, cf->fd, cf->userdata, oplsn_valid, oplsn);
cf->free_userdata(cf, cf->userdata);
}
ct->cf_list.remove_cf(cf);
......@@ -2381,38 +2391,16 @@ static void remove_pair_for_close(PAIR p, CACHETABLE ct) {
cachetable_maybe_remove_and_free_pair(&ct->list, &ct->ev, p);
}
// Flush (write to disk) all of the pairs that belong to a cachefile (or all pairs if
// the cachefile is NULL.
// Must be holding cachetable lock on entry.
//
// This function assumes that no client thread is accessing or
// trying to access the cachefile while this function is executing.
// This implies no client thread will be trying to lock any nodes
// belonging to the cachefile.
//
// This function also assumes that the cachefile is not in the process
// of being used by a checkpoint. If a checkpoint is currently happening,
// it does NOT include this cachefile.
//
static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
//
// Because work on a kibbutz is always done by the client thread,
// and this function assumes that no client thread is doing any work
// on the cachefile, we assume that no client thread will be adding jobs
// to this cachefile's kibbutz.
//
// The caller of this function must ensure that there are
// no jobs added to the kibbutz. This implies that the only work other
// threads may be doing is work by the writer threads.
//
// first write out dirty PAIRs
// helper function for cachetable_flush_cachefile, which happens on a close
// writes out the dirty pairs on background threads and returns when
// the writing is done
static void write_dirty_pairs_for_close(CACHETABLE ct, CACHEFILE cf) {
BACKGROUND_JOB_MANAGER bjm = NULL;
bjm_init(&bjm);
ct->list.write_list_lock(); // TODO: (Zardosht), verify that this lock is unnecessary to take here
PAIR p = NULL;
uint32_t i;
// write out dirty PAIRs
uint32_t i;
if (cf) {
for (i = 0, p = cf->cf_head;
i < cf->num_pairs;
......@@ -2432,40 +2420,79 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
ct->list.write_list_unlock();
bjm_wait_for_jobs_to_finish(bjm);
bjm_destroy(bjm);
}
// now that everything is clean, get rid of everything
static void remove_all_pairs_for_close(CACHETABLE ct, CACHEFILE cf) {
ct->list.write_list_lock();
if (cf) {
while (cf->num_pairs > 0) {
p = cf->cf_head;
PAIR p = cf->cf_head;
remove_pair_for_close(p, ct);
}
}
else {
while (ct->list.m_n_in_table > 0) {
p = ct->list.m_checkpoint_head;
PAIR p = ct->list.m_checkpoint_head;
remove_pair_for_close(p, ct);
}
}
ct->list.write_list_unlock();
}
static void verify_cachefile_flushed(CACHETABLE ct, CACHEFILE cf) {
// assert here that cachefile is flushed by checking
// pair_list and finding no pairs belonging to this cachefile
// Make a list of pairs that belong to this cachefile.
#ifdef TOKU_DEBUG_PARANOID
if (cf) {
ct->list.write_list_lock();
// assert here that cachefile is flushed by checking
// pair_list and finding no pairs belonging to this cachefile
// Make a list of pairs that belong to this cachefile.
uint32_t i;
PAIR p = NULL;
for (i = 0, p = ct->list.m_checkpoint_head;
i < ct->list.m_n_in_table;
i++, p = p->clock_next)
{
assert(p->cachefile != cf);
}
ct->list.write_list_unlock();
}
#endif
}
ct->list.write_list_unlock();
// Flush (write to disk) all of the pairs that belong to a cachefile (or all pairs if
// the cachefile is NULL.
// Must be holding cachetable lock on entry.
//
// This function assumes that no client thread is accessing or
// trying to access the cachefile while this function is executing.
// This implies no client thread will be trying to lock any nodes
// belonging to the cachefile.
//
// This function also assumes that the cachefile is not in the process
// of being used by a checkpoint. If a checkpoint is currently happening,
// it does NOT include this cachefile.
//
static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
//
// Because work on a kibbutz is always done by the client thread,
// and this function assumes that no client thread is doing any work
// on the cachefile, we assume that no client thread will be adding jobs
// to this cachefile's kibbutz.
//
// The caller of this function must ensure that there are
// no jobs added to the kibbutz. This implies that the only work other
// threads may be doing is work by the writer threads.
//
// first write out dirty PAIRs
write_dirty_pairs_for_close(ct, cf);
// now that everything is clean, get rid of everything
remove_all_pairs_for_close(ct, cf);
verify_cachefile_flushed(ct, cf);
if (cf) {
bjm_reset(cf->bjm);
}
......@@ -2871,6 +2898,7 @@ toku_cachefile_set_userdata (CACHEFILE cf,
void *userdata,
void (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
void (*close_userdata)(CACHEFILE, int, void*, bool, LSN),
void (*free_userdata)(CACHEFILE, void*),
void (*checkpoint_userdata)(CACHEFILE, int, void*),
void (*begin_checkpoint_userdata)(LSN, void*),
void (*end_checkpoint_userdata)(CACHEFILE, int, void*),
......@@ -2879,6 +2907,7 @@ toku_cachefile_set_userdata (CACHEFILE cf,
cf->userdata = userdata;
cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint;
cf->close_userdata = close_userdata;
cf->free_userdata = free_userdata;
cf->checkpoint_userdata = checkpoint_userdata;
cf->begin_checkpoint_userdata = begin_checkpoint_userdata;
cf->end_checkpoint_userdata = end_checkpoint_userdata;
......@@ -4286,7 +4315,7 @@ void checkpointer::increment_num_txns() {
//
void checkpointer::update_cachefiles() {
CACHEFILE cf;
for(cf = m_cf_list->m_head; cf; cf=cf->next) {
for(cf = m_cf_list->m_active_head; cf; cf=cf->next) {
assert(cf->begin_checkpoint_userdata);
if (cf->for_checkpoint) {
cf->begin_checkpoint_userdata(m_lsn_of_checkpoint_in_progress,
......@@ -4306,7 +4335,7 @@ void checkpointer::begin_checkpoint() {
// 2. Make list of cachefiles to be included in the checkpoint.
// TODO: <CER> How do we remove the non-lock cachetable reference here?
m_cf_list->read_lock();
for (CACHEFILE cf = m_cf_list->m_head; cf; cf = cf->next) {
for (CACHEFILE cf = m_cf_list->m_active_head; cf; cf = cf->next) {
// The caller must serialize open, close, and begin checkpoint.
// So we should never see a closing cachefile here.
// <CER> Is there an assert we can add here?
......@@ -4365,7 +4394,7 @@ void checkpointer::log_begin_checkpoint() {
m_lsn_of_checkpoint_in_progress = begin_lsn;
// Log the list of open dictionaries.
for (CACHEFILE cf = m_cf_list->m_head; cf; cf = cf->next) {
for (CACHEFILE cf = m_cf_list->m_active_head; cf; cf = cf->next) {
assert(cf->log_fassociate_during_checkpoint);
cf->log_fassociate_during_checkpoint(cf, cf->userdata);
}
......@@ -4446,7 +4475,7 @@ void checkpointer::end_checkpoint(void (*testcallback_f)(void*), void* testextr
void checkpointer::fill_checkpoint_cfs(CACHEFILE* checkpoint_cfs) {
m_cf_list->read_lock();
uint32_t curr_index = 0;
for (CACHEFILE cf = m_cf_list->m_head; cf; cf = cf->next) {
for (CACHEFILE cf = m_cf_list->m_active_head; cf; cf = cf->next) {
if (cf->for_checkpoint) {
assert(curr_index < m_checkpoint_num_files);
checkpoint_cfs[curr_index] = cf;
......@@ -4538,8 +4567,9 @@ void checkpointer::remove_cachefiles(CACHEFILE* checkpoint_cfs) {
static_assert(std::is_pod<cachefile_list>::value, "cachefile_list isn't POD");
void cachefile_list::init() {
m_head = NULL;
m_active_head = NULL;
m_next_filenum_to_use.fileid = 0;
m_next_hash_id_to_use = 0;
toku_pthread_rwlock_init(&m_lock, NULL);
}
......@@ -4567,7 +4597,7 @@ int cachefile_list::cachefile_of_iname_in_env(const char *iname_in_env, CACHEFIL
CACHEFILE extant;
int r;
r = ENOENT;
for (extant = m_head; extant; extant = extant->next) {
for (extant = m_active_head; extant; extant = extant->next) {
if (extant->fname_in_env &&
!strcmp(extant->fname_in_env, iname_in_env)) {
*cf = extant;
......@@ -4584,7 +4614,7 @@ int cachefile_list::cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf) {
CACHEFILE extant;
int r = ENOENT;
*cf = NULL;
for (extant = m_head; extant; extant = extant->next) {
for (extant = m_active_head; extant; extant = extant->next) {
if (extant->filenum.fileid==filenum.fileid) {
*cf = extant;
r = 0;
......@@ -4596,27 +4626,27 @@ int cachefile_list::cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf) {
}
void cachefile_list::add_cf_unlocked(CACHEFILE cf) {
cf->next = m_head;
cf->next = m_active_head;
cf->prev = NULL;
if (m_head) {
m_head->prev = cf;
if (m_active_head) {
m_active_head->prev = cf;
}
m_head = cf;
m_active_head = cf;
}
void cachefile_list::remove_cf(CACHEFILE cf) {
write_lock();
invariant(m_head != NULL);
invariant(m_active_head != NULL);
if (cf->next) {
cf->next->prev = cf->prev;
}
if (cf->prev) {
cf->prev->next = cf->next;
}
if (cf == m_head) {
if (cf == m_active_head) {
invariant(cf->prev == NULL);
m_head = cf->next;
m_active_head = cf->next;
}
write_unlock();
}
......@@ -4627,7 +4657,7 @@ FILENUM cachefile_list::reserve_filenum() {
// taking a write lock because we are modifying next_filenum_to_use
write_lock();
try_again:
for (extant = m_head; extant; extant = extant->next) {
for (extant = m_active_head; extant; extant = extant->next) {
if (m_next_filenum_to_use.fileid==extant->filenum.fileid) {
m_next_filenum_to_use.fileid++;
goto try_again;
......@@ -4639,9 +4669,15 @@ try_again:
return filenum;
}
uint32_t cachefile_list::get_new_hash_id_unlocked() {
uint32_t retval = m_next_hash_id_to_use;
m_next_hash_id_to_use++;
return retval;
}
CACHEFILE cachefile_list::find_cachefile_unlocked(struct fileid* fileid) {
CACHEFILE retval = NULL;
for (CACHEFILE extant = m_head; extant; extant = extant->next) {
for (CACHEFILE extant = m_active_head; extant; extant = extant->next) {
if (toku_fileids_are_equal(&extant->fileid, fileid)) {
// Clients must serialize cachefile open, close, and unlink
// So, during open, we should never see a closing cachefile
......@@ -4656,7 +4692,7 @@ exit:
}
void cachefile_list::verify_unused_filenum(FILENUM filenum) {
for (CACHEFILE extant = m_head; extant; extant = extant->next) {
for (CACHEFILE extant = m_active_head; extant; extant = extant->next) {
invariant(extant->filenum.fileid != filenum.fileid);
}
}
......
......@@ -275,6 +275,7 @@ typedef void (*CACHETABLE_REMOVE_KEY)(CACHEKEY* cachekey, bool for_checkpoint, v
void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata,
void (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
void (*close_userdata)(CACHEFILE, int, void*, bool, LSN),
void (*free_userdata)(CACHEFILE, void*),
void (*checkpoint_userdata)(CACHEFILE, int, void*),
void (*begin_checkpoint_userdata)(LSN, void*),
void (*end_checkpoint_userdata)(CACHEFILE, int, void*),
......
......@@ -317,6 +317,11 @@ static void ft_close(CACHEFILE cachefile, int fd, void *header_v, bool oplsn_val
ft_end_checkpoint(cachefile, fd, header_v);
assert(!ft->h->dirty); // dirty bit should be cleared by begin_checkpoint and never set again (because we're closing the dictionary)
}
}
// maps to cf->free_userdata
static void ft_free(CACHEFILE cachefile UU(), void *header_v) {
FT ft = (FT) header_v;
toku_ft_free(ft);
}
......@@ -392,6 +397,7 @@ static void ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) {
ft,
ft_log_fassociate_during_checkpoint,
ft_close,
ft_free,
ft_checkpoint,
ft_begin_checkpoint,
ft_end_checkpoint,
......@@ -494,6 +500,7 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac
(void*)h,
ft_log_fassociate_during_checkpoint,
ft_close,
ft_free,
ft_checkpoint,
ft_begin_checkpoint,
ft_end_checkpoint,
......
......@@ -136,7 +136,7 @@ void checkpointer_test::test_begin_checkpoint() {
struct cachefile cf;
cf.next = NULL;
cf.for_checkpoint = false;
m_cp.m_cf_list->m_head = &cf;
m_cp.m_cf_list->m_active_head = &cf;
create_dummy_functions(&cf);
m_cp.begin_checkpoint();
......@@ -146,7 +146,7 @@ void checkpointer_test::test_begin_checkpoint() {
// 3. Call checkpoint with MANY cachefiles.
const uint32_t count = 3;
struct cachefile cfs[count];
m_cp.m_cf_list->m_head = &cfs[0];
m_cp.m_cf_list->m_active_head = &cfs[0];
for (uint32_t i = 0; i < count; ++i) {
cfs[i].for_checkpoint = false;
create_dummy_functions(&cfs[i]);
......@@ -196,7 +196,7 @@ void checkpointer_test::test_pending_bits() {
memset(&cf, 0, sizeof(cf));
cf.next = NULL;
cf.for_checkpoint = true;
m_cp.m_cf_list->m_head = &cf;
m_cp.m_cf_list->m_active_head = &cf;
create_dummy_functions(&cf);
CACHEKEY k;
......@@ -341,7 +341,7 @@ void checkpointer_test::test_end_checkpoint() {
ZERO_STRUCT(m_cp);
m_cp.init(&ctbl.list, NULL, &ctbl.ev, &cfl);
m_cp.m_cf_list->m_head = &cf;
m_cp.m_cf_list->m_active_head = &cf;
// 2. Add data before running checkpoint.
const uint32_t count = 6;
......
......@@ -422,6 +422,7 @@ cachetable_test (void) {
NULL,
&dummy_log_fassociate,
&dummy_close_usr,
&dummy_free_usr,
&dummy_chckpnt_usr,
&test_begin_checkpoint,
&dummy_end,
......
......@@ -554,6 +554,7 @@ cachetable_test (void) {
NULL,
&dummy_log_fassociate,
&dummy_close_usr,
&dummy_free_usr,
&dummy_chckpnt_usr,
test_begin_checkpoint, // called in begin_checkpoint
&dummy_end,
......
......@@ -95,6 +95,7 @@ PATENT RIGHTS GRANT:
//
static void dummy_log_fassociate(CACHEFILE UU(cf), void* UU(p)) { }
static void dummy_close_usr(CACHEFILE UU(cf), int UU(i), void* UU(p), bool UU(b), LSN UU(lsn)) { }
static void dummy_free_usr(CACHEFILE UU(cf), void* UU(p)) { }
static void dummy_chckpnt_usr(CACHEFILE UU(cf), int UU(i), void* UU(p)) { }
static void dummy_begin(LSN UU(lsn), void* UU(p)) { }
static void dummy_end(CACHEFILE UU(cf), int UU(i), void* UU(p)) { }
......@@ -112,6 +113,7 @@ create_dummy_functions(CACHEFILE cf)
ud,
&dummy_log_fassociate,
&dummy_close_usr,
&dummy_free_usr,
&dummy_chckpnt_usr,
&dummy_begin,
&dummy_end,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment