Commit f4f743d1 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

closes #5211, closes #5215, closes 5224, merge to main

git-svn-id: file:///svn/toku/tokudb@45613 c7de825b-a66e-492c-adef-691d508d4ae1
parent 59cd27a1
......@@ -23,6 +23,7 @@ add_custom_target(
)
set(FT_SOURCES
background_job_manager.c
block_allocator.c
block_table.c
cachetable.c
......
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2011 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <config.h>
#include <stdbool.h>
#include <toku_pthread.h>
#include "kibbutz.h"
#include "background_job_manager.h"
#include "includes.h"
struct background_job_manager_struct {
bool accepting_jobs;
u_int32_t num_jobs;
toku_cond_t jobs_wait;
toku_mutex_t jobs_lock;
};
void bjm_init(BACKGROUND_JOB_MANAGER* pbjm) {
BACKGROUND_JOB_MANAGER XCALLOC(bjm);
toku_mutex_init(&bjm->jobs_lock, 0);
toku_cond_init(&bjm->jobs_wait, NULL);
bjm->accepting_jobs = true;
bjm->num_jobs = 0;
*pbjm = bjm;
}
void bjm_destroy(BACKGROUND_JOB_MANAGER bjm) {
assert(bjm->num_jobs == 0);
toku_cond_destroy(&bjm->jobs_wait);
toku_mutex_destroy(&bjm->jobs_lock);
toku_free(bjm);
}
void bjm_reset(BACKGROUND_JOB_MANAGER bjm) {
assert(bjm->num_jobs == 0);
bjm->accepting_jobs = true;
}
int bjm_add_background_job(BACKGROUND_JOB_MANAGER bjm) {
int ret_val;
toku_mutex_lock(&bjm->jobs_lock);
if (bjm->accepting_jobs) {
bjm->num_jobs++;
ret_val = 0;
}
else {
ret_val = -1;
}
toku_mutex_unlock(&bjm->jobs_lock);
return ret_val;
}
void bjm_remove_background_job(BACKGROUND_JOB_MANAGER bjm){
toku_mutex_lock(&bjm->jobs_lock);
assert(bjm->num_jobs > 0);
bjm->num_jobs--;
if (bjm->num_jobs == 0 && !bjm->accepting_jobs) {
toku_cond_broadcast(&bjm->jobs_wait);
}
toku_mutex_unlock(&bjm->jobs_lock);
}
void bjm_wait_for_jobs_to_finish(BACKGROUND_JOB_MANAGER bjm) {
toku_mutex_lock(&bjm->jobs_lock);
bjm->accepting_jobs = false;
while (bjm->num_jobs > 0) {
toku_cond_wait(&bjm->jobs_wait, &bjm->jobs_lock);
}
toku_mutex_unlock(&bjm->jobs_lock);
}
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ifndef BACKGROUND_JOB_MANAGER_H
#define BACKGROUND_JOB_MANAGER_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
typedef struct background_job_manager_struct *BACKGROUND_JOB_MANAGER;
void bjm_init(BACKGROUND_JOB_MANAGER* bjm);
void bjm_destroy(BACKGROUND_JOB_MANAGER bjm);
void bjm_reset(BACKGROUND_JOB_MANAGER bjm);
int bjm_add_background_job(BACKGROUND_JOB_MANAGER bjm);
void bjm_remove_background_job(BACKGROUND_JOB_MANAGER bjm);
void bjm_wait_for_jobs_to_finish(BACKGROUND_JOB_MANAGER bjm);
#endif
......@@ -22,6 +22,7 @@
#include "minicron.h"
#include "log-internal.h"
#include "kibbutz.h"
#include "background_job_manager.h"
///////////////////////////////////////////////////////////////////////////////////
// Engine status
......@@ -36,8 +37,6 @@ static u_int64_t cachetable_miss;
static u_int64_t cachetable_misstime; // time spent waiting for disk read
static u_int64_t cachetable_puts; // how many times has a newly created node been put into the cachetable?
static u_int64_t cachetable_prefetches; // how many times has a block been prefetched into the cachetable?
static u_int64_t cachetable_maybe_get_and_pins; // how many times has maybe_get_and_pin(_clean) been called?
static u_int64_t cachetable_maybe_get_and_pin_hits; // how many times has get_and_pin(_clean) returned with a node?
static u_int64_t cachetable_evictions;
static u_int64_t cleaner_executions; // number of times the cleaner thread's loop has executed
......@@ -60,8 +59,6 @@ status_init(void) {
STATUS_INIT(CT_MISSTIME, UINT64, "miss time");
STATUS_INIT(CT_PUTS, UINT64, "puts (new nodes created)");
STATUS_INIT(CT_PREFETCHES, UINT64, "prefetches");
STATUS_INIT(CT_MAYBE_GET_AND_PINS, UINT64, "maybe_get_and_pin");
STATUS_INIT(CT_MAYBE_GET_AND_PIN_HITS, UINT64, "maybe_get_and_pin hits");
STATUS_INIT(CT_SIZE_CURRENT, UINT64, "size current");
STATUS_INIT(CT_SIZE_LIMIT, UINT64, "size limit");
STATUS_INIT(CT_SIZE_MAX, UINT64, "size max");
......@@ -99,8 +96,6 @@ struct ctpair {
enum cachetable_dirty dirty;
char verify_flag; // Used in verify_cachetable()
BOOL remove_me; // write_pair
u_int32_t fullhash;
CACHETABLE_FLUSH_CALLBACK flush_callback;
......@@ -115,7 +110,6 @@ struct ctpair {
PAIR hash_chain;
u_int32_t count; // clock count
BOOL checkpoint_pending; // If this is on, then we have got to write the pair out to disk before modifying it.
PAIR pending_next;
PAIR pending_prev;
......@@ -125,8 +119,6 @@ struct ctpair {
struct workqueue *cq; // writers sometimes return ctpair's using this queue
struct workitem asyncwork; // work item for the worker threads
struct workitem checkpoint_asyncwork; // work item for the worker threads
u_int32_t refs; // References that prevent destruction
int already_removed; // If a pair is removed from the cachetable, but cannot be freed because refs>0, this is set.
struct toku_list next_for_cachefile; // link in the cachefile list
};
......@@ -142,26 +134,14 @@ static PAIR_ATTR const zero_attr = {
static void maybe_flush_some (CACHETABLE ct, long size);
static inline void
ctpair_add_ref(PAIR p) {
assert(!p->already_removed);
p->refs++;
}
static inline void ctpair_destroy(PAIR p) {
assert(p->refs>0);
p->refs--;
if (p->refs==0) {
nb_mutex_destroy(&p->value_nb_mutex);
nb_mutex_destroy(&p->disk_nb_mutex);
toku_free(p);
}
nb_mutex_destroy(&p->value_nb_mutex);
nb_mutex_destroy(&p->disk_nb_mutex);
toku_free(p);
}
// The cachetable is as close to an ENV as we get.
// There are 3 locks, must be taken in this order
// cachetable_mutex
// cachefiles_mutex
struct cachetable {
u_int32_t n_in_table; // number of pairs in the hash table
u_int32_t table_size; // number of buckets in the hash table
......@@ -177,7 +157,6 @@ struct cachetable {
int64_t size_max; // high water mark of size_current (max value size_current ever had)
TOKULOGGER logger;
toku_mutex_t *mutex; // coarse lock that protects the cachetable, the cachefiles, and the pairs
toku_mutex_t cachefiles_mutex; // lock that protects the cachefiles list
struct workqueue wq; // async work queue
THREADPOOL threadpool; // pool of worker threads
struct workqueue checkpoint_wq;
......@@ -210,17 +189,9 @@ struct cachetable {
int64_t size_rollback;
int64_t size_cachepressure;
// variables used by the checkpoint thread to know
// variable used by the checkpoint thread to know
// when all work induced by cloning on client threads is done
// when a client thread clones a PAIR and places it on
// a background thread to be written out, n_checkpoint_clones_running
// is incremented. On the background thread, when the checkpointing
// is completed, n_checkpoint_clones_running is decremented.
// When the checkpoint thread uses clones_background_wait for
// n_checkpoint_clones_running to go to zero, it knows that
// the checkpoint is complete
u_int32_t n_checkpoint_clones_running;
toku_cond_t clones_background_wait;
BACKGROUND_JOB_MANAGER checkpoint_clones_bjm;
};
......@@ -233,8 +204,6 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
STATUS_VALUE(CT_MISSTIME) = cachetable_misstime;
STATUS_VALUE(CT_PUTS) = cachetable_puts;
STATUS_VALUE(CT_PREFETCHES) = cachetable_prefetches;
STATUS_VALUE(CT_MAYBE_GET_AND_PINS) = cachetable_maybe_get_and_pins;
STATUS_VALUE(CT_MAYBE_GET_AND_PIN_HITS) = cachetable_maybe_get_and_pin_hits;
STATUS_VALUE(CT_SIZE_CURRENT) = ct->size_current;
STATUS_VALUE(CT_SIZE_LIMIT) = ct->size_limit;
STATUS_VALUE(CT_SIZE_MAX) = ct->size_max;
......@@ -262,18 +231,6 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
// verifying that we just incremented it with the matching BEGIN macro.
#define END_CRITICAL_REGION {invariant(ct->checkpoint_prohibited > 0); __sync_fetch_and_sub(&ct->checkpoint_prohibited, 1);}
// Lock the cachefiles. Any function that traverses or modifies the
// list of cachefiles must hold this lock.
static inline void cachefiles_lock(CACHETABLE ct) {
toku_mutex_lock(&ct->cachefiles_mutex);
}
// Unlock the cachefiles
static inline void cachefiles_unlock(CACHETABLE ct) {
toku_mutex_unlock(&ct->cachefiles_mutex);
}
// Lock the cachetable. Used for a variety of purposes. TODO: like what?
static inline void cachetable_lock(CACHETABLE ct __attribute__((unused))) {
toku_mutex_lock(ct->mutex);
......@@ -306,10 +263,7 @@ struct cachefile {
CACHEFILE next_in_checkpoint;
struct toku_list pairs_for_cachefile; // list of pairs for this cachefile
BOOL for_checkpoint; //True if part of the in-progress checkpoint
BOOL is_closing; /* TRUE if a cachefile is being close/has been closed. */
bool is_flushing; // during cachetable_flush_cachefile, this must be
// true, to prevent the cleaner thread from messing
// with nodes in that cachefile
// If set and the cachefile closes, the file will be removed.
// Clients must not operate on the cachefile after setting this,
// nor attempt to open any cachefile with the same fname (dname)
......@@ -333,67 +287,27 @@ struct cachefile {
LSN most_recent_global_checkpoint_that_finished_early;
LSN for_local_checkpoint;
enum cachefile_checkpoint_state checkpoint_state;
int n_background_jobs; // how many jobs in the cachetable's kibbutz or
// on the cleaner thread (anything
// cachetable_flush_cachefile should wait on)
// are working on this cachefile. Each job should, at the
// end, obtain the cachetable mutex, decrement
// this variable, and broadcast the
// kibbutz_wait condition variable to let
// anyone else know it's happened.
toku_cond_t background_wait; // Any job that finishes a
// background job should
// broadcast on this cond
// variable (holding the
// cachetable mutex). That way
// when closing the cachefile,
// we can get a notification
// when things finish.
BACKGROUND_JOB_MANAGER bjm;
};
// FIXME global with no toku prefix
void add_background_job(CACHEFILE cf, bool already_locked)
void remove_background_job_from_cf(CACHEFILE cf)
{
if (!already_locked) {
cachetable_lock(cf->cachetable);
}
cf->n_background_jobs++;
if (!already_locked) {
cachetable_unlock(cf->cachetable);
}
}
// FIXME global with no toku prefix
void remove_background_job(CACHEFILE cf, bool already_locked)
{
if (!already_locked) {
cachetable_lock(cf->cachetable);
}
assert(cf->n_background_jobs>0);
cf->n_background_jobs--;
toku_cond_broadcast(&cf->background_wait);
if (!already_locked) {
cachetable_unlock(cf->cachetable);
}
bjm_remove_background_job(cf->bjm);
}
// FIXME global with no toku prefix
void cachefile_kibbutz_enq (CACHEFILE cf, void (*f)(void*), void *extra)
// The function f must call remove_background_job when it completes
// The function f must call remove_background_job_from_cf when it completes
{
add_background_job(cf, false);
int r = bjm_add_background_job(cf->bjm);
// if client should is adding a background job, then it must be done
// at a time when the manager is accepting background jobs, otherwise
// the client is screwing up
assert_zero(r);
toku_kibbutz_enq(cf->cachetable->kibbutz, f, extra);
}
static void wait_on_background_jobs_to_finish (CACHEFILE cf) {
cachetable_lock(cf->cachetable);
while (cf->n_background_jobs>0) {
toku_cond_wait(&cf->background_wait, cf->cachetable->mutex);
}
cachetable_unlock(cf->cachetable);
}
static int
checkpoint_thread (void *cachetable_v)
// Effect: If checkpoint_period>0 thn periodically run a checkpoint.
......@@ -477,7 +391,6 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_l
toku_init_workers(&ct->wq, &ct->threadpool, 1);
toku_init_workers(&ct->checkpoint_wq, &ct->checkpoint_threadpool, 8);
ct->mutex = workqueue_lock_ref(&ct->wq);
toku_mutex_init(&ct->cachefiles_mutex, 0);
ct->kibbutz = toku_kibbutz_create(toku_os_get_number_active_processors());
......@@ -485,7 +398,7 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_l
toku_minicron_setup(&ct->cleaner, 0, toku_cleaner_thread, ct); // default is no cleaner, for now
ct->cleaner_iterations = 1; // default is one iteration
ct->env_dir = toku_xstrdup(".");
toku_cond_init(&ct->clones_background_wait, NULL);
bjm_init(&ct->checkpoint_clones_bjm);
*result = ct;
return 0;
}
......@@ -522,7 +435,7 @@ toku_cachetable_set_env_dir(CACHETABLE ct, const char *env_dir) {
// Once the close has finished, there must not be a cachefile with that name
// in the cachetable.
int toku_cachefile_of_iname_in_env (CACHETABLE ct, const char *iname_in_env, CACHEFILE *cf) {
cachefiles_lock(ct);
cachetable_lock(ct);
CACHEFILE extant;
int r;
r = ENOENT;
......@@ -534,27 +447,26 @@ int toku_cachefile_of_iname_in_env (CACHETABLE ct, const char *iname_in_env, CAC
break;
}
}
cachefiles_unlock(ct);
cachetable_unlock(ct);
return r;
}
// What cachefile goes with particular fd?
// This function can only be called if the brt is still open, so file must
// still be open and cannot be in the is_closing state.
// still be open
int toku_cachefile_of_filenum (CACHETABLE ct, FILENUM filenum, CACHEFILE *cf) {
cachefiles_lock(ct);
cachetable_lock(ct);
CACHEFILE extant;
int r = ENOENT;
*cf = NULL;
for (extant = ct->cachefiles; extant; extant=extant->next) {
if (extant->filenum.fileid==filenum.fileid) {
assert(!extant->is_closing);
*cf = extant;
r = 0;
break;
}
}
cachefiles_unlock(ct);
cachetable_unlock(ct);
return r;
}
......@@ -580,7 +492,6 @@ toku_cachetable_reserve_filenum(CACHETABLE ct) {
FILENUM filenum;
invariant(ct);
cachetable_lock(ct);
cachefiles_lock(ct);
try_again:
for (extant = ct->cachefiles; extant; extant=extant->next) {
if (next_filenum_to_use.fileid==extant->filenum.fileid) {
......@@ -590,7 +501,6 @@ try_again:
}
filenum = next_filenum_to_use;
next_filenum_to_use.fileid++;
cachefiles_unlock(ct);
cachetable_unlock(ct);
return filenum;
}
......@@ -609,13 +519,11 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
return r;
}
cachetable_lock(ct);
cachefiles_lock(ct);
for (extant = ct->cachefiles; extant; extant=extant->next) {
if (memcmp(&extant->fileid, &fileid, sizeof(fileid))==0) {
// Clients must serialize cachefile open, close, and unlink
// So, during open, we should never see a closing cachefile
// or one that has been marked as unlink on close.
assert(!extant->is_closing);
assert(!extant->unlink_on_close);
// Reuse an existing cachefile and close the caller's fd, whose
......@@ -647,19 +555,17 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
newcf->for_local_checkpoint = ZERO_LSN;
newcf->checkpoint_state = CS_NOT_IN_PROGRESS;
toku_cond_init(&newcf->background_wait, NULL);
bjm_init(&newcf->bjm);
toku_list_init(&newcf->pairs_for_cachefile);
*cfptr = newcf;
r = 0;
}
exit:
cachefiles_unlock(ct);
cachetable_unlock(ct);
return r;
}
static void cachetable_flush_cachefile (CACHETABLE, CACHEFILE cf);
static void assert_cachefile_is_flushed_and_removed (CACHETABLE ct, CACHEFILE cf);
//TEST_ONLY_FUNCTION
int toku_cachetable_openf (CACHEFILE *cfptr, CACHETABLE ct, const char *fname_in_env, int flags, mode_t mode) {
......@@ -731,13 +637,7 @@ static CACHEFILE remove_cf_from_list_locked (CACHEFILE cf, CACHEFILE list) {
static void remove_cf_from_cachefiles_list (CACHEFILE cf) {
CACHETABLE ct = cf->cachetable;
cachefiles_lock(ct);
ct->cachefiles = remove_cf_from_list_locked(cf, ct->cachefiles);
cachefiles_unlock(ct);
}
void toku_cachefile_wait_for_background_work_to_quiesce(CACHEFILE cf) {
wait_on_background_jobs_to_finish(cf);
}
int
......@@ -746,19 +646,8 @@ toku_cachefile_close(CACHEFILE *cfp, char **error_string, BOOL oplsn_valid, LSN
CACHEFILE cf = *cfp;
CACHETABLE ct = cf->cachetable;
// Mark this cachefile as flushing so that cleaner threads know
// not to operate on any of its pairs.
cachetable_lock(ct);
cf->is_flushing = true;
cachetable_unlock(ct);
// There may be reader, writer, or flusher threads on the kibbutz
// that need to do work on pairs for this cf. Before we can close
// the underlying file, we need to wait for them to finish. No new
// work should start because clients of the cachetable are not supposed
// to use a cachefile in parallel with a close, or afterwards.
wait_on_background_jobs_to_finish(cf);
bjm_wait_for_jobs_to_finish(cf->bjm);
// Hold the cachetable lock while we check some invariants and
// flush the cachefile.
cachetable_lock(ct);
......@@ -769,11 +658,6 @@ toku_cachefile_close(CACHEFILE *cfp, char **error_string, BOOL oplsn_valid, LSN
assert(!cf->next_in_checkpoint);
assert(!cf->for_checkpoint);
// Help enforce the client contract that open/close should never
// run in parallel.
assert(!cf->is_closing);
cf->is_closing = true;
// Flush the cachefile and remove all of its pairs from the cachetable
cachetable_flush_cachefile(ct, cf);
assert(toku_list_empty(&cf->pairs_for_cachefile));
......@@ -785,7 +669,8 @@ toku_cachefile_close(CACHEFILE *cfp, char **error_string, BOOL oplsn_valid, LSN
}
remove_cf_from_cachefiles_list(cf);
toku_cond_destroy(&cf->background_wait);
bjm_destroy(cf->bjm);
cf->bjm = NULL;
// Don't hold the cachetable lock during fsync/close/unlink, etc
cachetable_unlock(ct);
......@@ -820,12 +705,9 @@ toku_cachefile_close(CACHEFILE *cfp, char **error_string, BOOL oplsn_valid, LSN
// cachefile.
//
int toku_cachefile_flush (CACHEFILE cf) {
bjm_wait_for_jobs_to_finish(cf->bjm);
CACHETABLE ct = cf->cachetable;
cachetable_lock(ct);
cf->is_flushing = true;
cachetable_unlock(ct);
wait_on_background_jobs_to_finish(cf);
cachetable_lock(ct);
cachetable_flush_cachefile(ct, cf);
cachetable_unlock(ct);
return 0;
......@@ -886,7 +768,6 @@ static void cachetable_rehash (CACHETABLE ct, u_int32_t newtable_size) {
#define CLOCK_SATURATION 15
#define CLOCK_INITIAL_COUNT 3
#define CLOCK_WRITE_OUT 1
static void pair_remove (CACHETABLE ct, PAIR p) {
if (p->clock_prev == p) {
......@@ -1008,7 +889,6 @@ static void cachetable_remove_pair (CACHETABLE ct, PAIR p) {
ct->table[h] = remove_from_hash_chain (p, ct->table[h]);
}
cachetable_remove_pair_attr(ct, p->attr);
p->already_removed = TRUE;
}
static void cachetable_free_pair(CACHETABLE ct, PAIR p) {
......@@ -1042,8 +922,7 @@ static void cachetable_free_pair(CACHETABLE ct, PAIR p) {
// The sole purpose of this function is to remove the node, so the write_me
// argument to the flush callback is false, and the flush callback won't do
// anything except destroy the node.
static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p, BOOL* destroyed) {
*destroyed = FALSE;
static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p) {
if (nb_mutex_users(&p->value_nb_mutex) == 0) {
// assumption is that if we are about to remove the pair
// that no one has grabbed the disk_nb_mutex,
......@@ -1053,7 +932,6 @@ static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p, BOOL*
assert(p->cloned_value_data == NULL);
cachetable_remove_pair(ct, p);
cachetable_free_pair(ct, p);
*destroyed = TRUE;
}
}
......@@ -1149,12 +1027,10 @@ static void cachetable_write_locked_pair(CACHETABLE ct, PAIR p) {
// maybe removing the pair from the cachetable if there are no
// references to it
static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remove, BOOL* destroyed) {
static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p) {
p->cq = 0;
nb_mutex_unlock(&p->value_nb_mutex);
if (do_remove) {
cachetable_maybe_remove_and_free_pair(ct, p, destroyed);
}
cachetable_maybe_remove_and_free_pair(ct, p);
}
// Write a pair to storage
......@@ -1162,7 +1038,7 @@ static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remov
// the pair dirty state is adjusted, and the write is completed. The keep_me
// boolean is true, so the pair is not yet evicted from the cachetable.
// Requires: This thread must hold the write lock for the pair.
static void cachetable_write_pair(CACHETABLE ct, PAIR p, BOOL remove_me) {
static void cachetable_evict_pair(CACHETABLE ct, PAIR p) {
long old_size = p->attr.size;
// this function may change p->attr.size, so we saved
// the estimate we must have put into ct->evicting_size above
......@@ -1170,36 +1046,31 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p, BOOL remove_me) {
// maybe wakeup any stalled writers when the pending writes fall below
// 1/8 of the size of the cachetable
if (remove_me) {
ct->size_evicting -= old_size;
assert(ct->size_evicting >= 0);
if (8*ct->size_evicting <= ct->size_current) {
workqueue_wakeup_write(&ct->wq, 0);
}
}
// stuff it into a completion queue for delayed completion if a completion queue exists
// otherwise complete the write now
if (p->cq)
workqueue_enq(p->cq, &p->asyncwork, 1);
else {
BOOL destroyed;
cachetable_complete_write_pair(ct, p, remove_me, &destroyed);
ct->size_evicting -= old_size;
assert(ct->size_evicting >= 0);
if (8*ct->size_evicting <= ct->size_current) {
workqueue_wakeup_write(&ct->wq, 0);
}
assert(!p->cq);
cachetable_complete_write_pair(ct, p);
}
// Worker thread function to write a pair from memory to its cachefile
// As of now, the writer thread NEVER evicts, hence passing FALSE
// for the third parameter to cachetable_write_pair
static void cachetable_writer(WORKITEM wi) {
// Worker thread function to writes and evicts a pair from memory to its cachefile
static void cachetable_evicter(WORKITEM wi) {
PAIR p = workitem_arg(wi);
CACHETABLE ct = p->cachefile->cachetable;
CACHEFILE cf = p->cachefile;
CACHETABLE ct = cf->cachetable;
cachetable_lock(ct);
cachetable_write_pair(ct, p, p->remove_me);
cachetable_evict_pair(ct, p);
cachetable_unlock(ct);
bjm_remove_background_job(cf->bjm);
}
// CT lock held on entry
// background job has been added for p->cachefile on entry
// responsibility of this function to make sure that background job is removed
static void try_evict_pair(CACHETABLE ct, PAIR p) {
CACHEFILE cf = p->cachefile;
// evictions without a write or unpinned pair's that are clean
// can be run in the current thread
......@@ -1218,45 +1089,18 @@ static void try_evict_pair(CACHETABLE ct, PAIR p) {
// which may be expensive. Hence, if either is true, we
// do the eviction on a writer thread
if (!p->dirty && (nb_mutex_writers(&p->disk_nb_mutex) == 0)) {
cachetable_write_pair(ct, p, TRUE);
cachetable_evict_pair(ct, p);
bjm_remove_background_job(cf->bjm);
}
else {
p->remove_me = TRUE;
WORKITEM wi = &p->asyncwork;
workitem_init(wi, cachetable_writer, p);
//responsibility of cachetable_evicter to remove background job
workitem_init(wi, cachetable_evicter, p);
workqueue_enq(&ct->wq, wi, 0);
}
}
}
// flush and remove a pair from the cachetable. the callbacks are run by a thread in
// a thread pool.
static void flush_and_maybe_remove (CACHETABLE ct, PAIR p) {
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
// this needs to be done here regardless of whether the eviction occurs on the main thread or on
// a writer thread, because there may be a completion queue that needs access to this information
WORKITEM wi = &p->asyncwork;
// we are not going to remove if we are posting a dirty node
// to the writer thread.
// In that case, we will let the caller decide if they want to remove it.
// We may be able to just let the writer thread evict the node,
// but I (Zardosht) do not understand the caller well enough
// so I am hesitant to change it.
p->remove_me = FALSE;
workitem_init(wi, cachetable_writer, p);
// evictions without a write or unpinned pair's that are clean
// can be run in the current thread
if (!nb_mutex_writers(&p->value_nb_mutex) && !p->dirty) {
assert(ct->size_evicting >= 0);
ct->size_evicting += p->attr.size;
assert(ct->size_evicting >= 0);
cachetable_write_pair(ct, p, TRUE);
}
else {
workqueue_enq(&ct->wq, wi, 0);
}
}
static void do_partial_eviction(CACHETABLE ct, PAIR p) {
PAIR_ATTR new_attr;
PAIR_ATTR old_attr = p->attr;
......@@ -1273,33 +1117,91 @@ static void do_partial_eviction(CACHETABLE ct, PAIR p) {
if (8*ct->size_evicting <= ct->size_current) {
workqueue_wakeup_write(&ct->wq, 0);
}
if (p->cq) {
workitem_init(&p->asyncwork, NULL, p);
workqueue_enq(p->cq, &p->asyncwork, 1);
}
else {
nb_mutex_unlock(&p->value_nb_mutex);
}
assert(!p->cq);
nb_mutex_unlock(&p->value_nb_mutex);
}
static void cachetable_partial_eviction(WORKITEM wi) {
PAIR p = workitem_arg(wi);
CACHETABLE ct = p->cachefile->cachetable;
CACHEFILE cf = p->cachefile;
CACHETABLE ct = cf->cachetable;
cachetable_lock(ct);
do_partial_eviction(ct,p);
cachetable_unlock(ct);
bjm_remove_background_job(cf->bjm);
}
// cachetable lock held on entry
// run eviction on PAIR, may be partial eviction or full eviction
static bool run_eviction_on_pair(PAIR curr_in_clock, CACHETABLE ct) {
bool ret_val = false;
// function meant to be called on PAIR that is not being accessed right now
assert(nb_mutex_users(&curr_in_clock->value_nb_mutex) == 0);
assert(nb_mutex_users(&curr_in_clock->disk_nb_mutex) == 0);
CACHEFILE cf = curr_in_clock->cachefile;
int r = bjm_add_background_job(cf->bjm);
if (r) {
goto exit;
}
ret_val = true;
if (curr_in_clock->count > 0) {
curr_in_clock->count--;
// call the partial eviction callback
nb_mutex_lock(&curr_in_clock->value_nb_mutex, ct->mutex);
void *value = curr_in_clock->value_data;
void* disk_data = curr_in_clock->disk_data;
void *write_extraargs = curr_in_clock->write_extraargs;
enum partial_eviction_cost cost;
long bytes_freed_estimate = 0;
curr_in_clock->pe_est_callback(
value,
disk_data,
&bytes_freed_estimate,
&cost,
write_extraargs
);
if (cost == PE_CHEAP) {
curr_in_clock->size_evicting_estimate = 0;
do_partial_eviction(ct, curr_in_clock);
bjm_remove_background_job(cf->bjm);
}
else if (cost == PE_EXPENSIVE) {
// only bother running an expensive partial eviction
// if it is expected to free space
if (bytes_freed_estimate > 0) {
curr_in_clock->size_evicting_estimate = bytes_freed_estimate;
ct->size_evicting += bytes_freed_estimate;
WORKITEM wi = &curr_in_clock->asyncwork;
// responsibility of cachetable_partial_eviction to remove background job
workitem_init(wi, cachetable_partial_eviction, curr_in_clock);
workqueue_enq(&ct->wq, wi, 0);
}
else {
assert(!curr_in_clock->cq);
nb_mutex_unlock(&curr_in_clock->value_nb_mutex);
bjm_remove_background_job(cf->bjm);
}
}
else {
assert(FALSE);
}
}
else {
// responsibility of try_evict_pair to eventually remove background job
try_evict_pair(ct, curr_in_clock);
}
exit:
return ret_val;
}
static void maybe_flush_some (CACHETABLE ct, long size) {
if (size + ct->size_current <= ct->size_limit + ct->size_evicting) return;
//
// These variables will help us detect if everything in the clock is currently being accessed.
// We must detect this case otherwise we will end up in an infinite loop below.
//
if (size + ct->size_current <= ct->size_limit + ct->size_evicting) return;
CACHEKEY curr_cachekey;
curr_cachekey.b = INT64_MAX; // create initial value so compiler does not complain
FILENUM curr_filenum;
......@@ -1308,78 +1210,31 @@ static void maybe_flush_some (CACHETABLE ct, long size) {
while ((ct->clock_head) && (size + ct->size_current > ct->size_limit + ct->size_evicting)) {
PAIR curr_in_clock = ct->clock_head;
if (set_val &&
curr_in_clock->key.b == curr_cachekey.b &&
curr_in_clock->cachefile->filenum.fileid == curr_filenum.fileid)
{
// we have identified a cycle where everything in the clock is in use
// do not return an error
// just let memory be overfull
goto exit;
}
if (nb_mutex_users(&curr_in_clock->value_nb_mutex) || nb_mutex_users(&curr_in_clock->disk_nb_mutex)) {
if (set_val &&
curr_in_clock->key.b == curr_cachekey.b &&
curr_in_clock->cachefile->filenum.fileid == curr_filenum.fileid)
{
// we have identified a cycle where everything in the clock is in use
// do not return an error
// just let memory be overfull
goto exit;
}
else {
if (!set_val) {
set_val = TRUE;
curr_cachekey = ct->clock_head->key;
curr_filenum = ct->clock_head->cachefile->filenum;
}
if (!set_val) {
set_val = TRUE;
curr_cachekey = ct->clock_head->key;
curr_filenum = ct->clock_head->cachefile->filenum;
}
}
else {
set_val = FALSE;
if (curr_in_clock->count > 0) {
curr_in_clock->count--;
// call the partial eviction callback
nb_mutex_lock(&curr_in_clock->value_nb_mutex, ct->mutex);
void *value = curr_in_clock->value_data;
void* disk_data = curr_in_clock->disk_data;
void *write_extraargs = curr_in_clock->write_extraargs;
enum partial_eviction_cost cost;
long bytes_freed_estimate = 0;
curr_in_clock->pe_est_callback(
value,
disk_data,
&bytes_freed_estimate,
&cost,
write_extraargs
);
if (cost == PE_CHEAP) {
curr_in_clock->size_evicting_estimate = 0;
do_partial_eviction(ct, curr_in_clock);
}
else if (cost == PE_EXPENSIVE) {
// only bother running an expensive partial eviction
// if it is expected to free space
if (bytes_freed_estimate > 0) {
curr_in_clock->size_evicting_estimate = bytes_freed_estimate;
ct->size_evicting += bytes_freed_estimate;
WORKITEM wi = &curr_in_clock->asyncwork;
workitem_init(wi, cachetable_partial_eviction, curr_in_clock);
workqueue_enq(&ct->wq, wi, 0);
}
else {
// maybe_flush_some is always run on a client thread
// As a result, the cachefile cannot be in process of closing,
// and therefore a completion queue is not set up for
// closing the cachefile
// Also, because we locked this PAIR when there were no
// other users trying to get access, no thread running
// unpin_and_remove may have gotten in here and
// set up a completion queue.
// So, a completion queue cannot exist
assert(!curr_in_clock->cq);
nb_mutex_unlock(&curr_in_clock->value_nb_mutex);
}
}
else {
assert(FALSE);
}
bool eviction_run = run_eviction_on_pair(curr_in_clock, ct);
if (eviction_run) {
set_val = FALSE;
}
else {
try_evict_pair(ct, curr_in_clock);
else if (!set_val) {
set_val = TRUE;
curr_cachekey = ct->clock_head->key;
curr_filenum = ct->clock_head->cachefile->filenum;
}
}
// at this point, either curr_in_clock is still in the list because it has not been fully evicted,
......@@ -1416,7 +1271,6 @@ static PAIR cachetable_insert_at(CACHETABLE ct,
PAIR MALLOC(p);
assert(p);
memset(p, 0, sizeof *p);
ctpair_add_ref(p);
p->cachefile = cachefile;
p->key = key;
p->value_data = value;
......@@ -1434,7 +1288,6 @@ static PAIR cachetable_insert_at(CACHETABLE ct,
p->write_extraargs = write_callback.write_extraargs;
p->fullhash = fullhash;
p->clock_next = p->clock_prev = 0;
p->remove_me = FALSE;
nb_mutex_init(&p->value_nb_mutex);
nb_mutex_init(&p->disk_nb_mutex);
p->cq = 0;
......@@ -1570,10 +1423,7 @@ static void checkpoint_cloned_pair(WORKITEM wi) {
TRUE //is_clone
);
nb_mutex_unlock(&p->disk_nb_mutex);
ct->n_checkpoint_clones_running--;
if (ct->n_checkpoint_clones_running == 0) {
toku_cond_broadcast(&ct->clones_background_wait);
}
bjm_remove_background_job(ct->checkpoint_clones_bjm);
cachetable_unlock(ct);
}
......@@ -1604,7 +1454,8 @@ write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p)
assert(p->cloned_value_data);
// place it on the background thread and continue
// responsibility of writer thread to release disk_nb_mutex
ct->n_checkpoint_clones_running++;
int r = bjm_add_background_job(ct->checkpoint_clones_bjm);
assert_zero(r);
checkpoint_cloned_pair_on_writer_thread(ct, p);
// possibly run eviction because act of cloning adds
// to ct->size_current, we don't do it in
......@@ -1661,7 +1512,7 @@ write_pair_for_checkpoint_thread (CACHETABLE ct, PAIR p)
// now release value_nb_mutex, before we write the PAIR out
// so that the PAIR is available to client threads
nb_mutex_unlock(&p->value_nb_mutex); // didn't call cachetable_write_pair so we have to unlock it ourselves.
nb_mutex_unlock(&p->value_nb_mutex); // didn't call cachetable_evict_pair so we have to unlock it ourselves.
if (p->clone_callback) {
// note that pending lock is not needed here because
// we KNOW we are in the middle of a checkpoint
......@@ -1872,21 +1723,9 @@ do_partial_fetch(
p->attr = new_attr;
cachetable_change_pair_attr(ct, old_attr, new_attr);
nb_mutex_unlock(&p->disk_nb_mutex);
if (keep_pair_locked) {
// if the caller wants the pair to remain locked
// that means the caller requests continued
// ownership of the PAIR, so there better not
// be a cq asking to transfer ownership
assert(!p->cq);
}
else {
if (p->cq) {
workitem_init(&p->asyncwork, NULL, p);
workqueue_enq(p->cq, &p->asyncwork, 1);
}
else {
nb_mutex_unlock(&p->value_nb_mutex);
}
assert(!p->cq);
if (!keep_pair_locked) {
nb_mutex_unlock(&p->value_nb_mutex);
}
}
......@@ -1996,21 +1835,9 @@ static void cachetable_fetch_pair(
p->attr = attr;
cachetable_add_pair_attr(ct, attr);
nb_mutex_unlock(&p->disk_nb_mutex);
if (keep_pair_locked) {
// if the caller wants the pair to remain locked
// that means the caller requests continued
// ownership of the PAIR, so there better not
// be a cq asking to transfer ownership
assert(!p->cq);
}
else {
if (p->cq) {
workitem_init(&p->asyncwork, NULL, p);
workqueue_enq(p->cq, &p->asyncwork, 1);
}
else {
nb_mutex_unlock(&p->value_nb_mutex);
}
assert(!p->cq);
if (!keep_pair_locked) {
nb_mutex_unlock(&p->value_nb_mutex);
}
if (0) printf("%s:%d %"PRId64" complete\n", __FUNCTION__, __LINE__, key.b);
}
......@@ -2195,20 +2022,17 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3
PAIR p;
int r = -1;
cachetable_lock(ct);
cachetable_maybe_get_and_pins++;
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
if (p->key.b==key.b && p->cachefile==cachefile) {
if (!p->checkpoint_pending && //If checkpoint pending, we would need to first write it, which would make it clean
p->dirty &&
nb_mutex_users(&p->value_nb_mutex) == 0
) {
cachetable_maybe_get_and_pin_hits++;
// because nb_mutex_users is 0, this is fast
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
*value = p->value_data;
pair_touch(p);
r = 0;
//printf("%s:%d cachetable_maybe_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value);
}
break;
}
......@@ -2225,18 +2049,15 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key,
PAIR p;
int r = -1;
cachetable_lock(ct);
cachetable_maybe_get_and_pins++;
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
if (p->key.b==key.b && p->cachefile==cachefile) {
if (!p->checkpoint_pending && //If checkpoint pending, we would need to first write it, which would make it clean (if the pin would be used for writes. If would be used for read-only we could return it, but that would increase complexity)
nb_mutex_users(&p->value_nb_mutex) == 0
) {
cachetable_maybe_get_and_pin_hits++;
// because nb_mutex_users is 0, this is fast
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
*value = p->value_data;
r = 0;
//printf("%s:%d cachetable_maybe_get_and_pin_clean(%lld)--> %p\n", __FILE__, __LINE__, key, *value);
}
break;
}
......@@ -2450,11 +2271,9 @@ struct cachefile_partial_prefetch_args {
// Worker thread function to read a pair from a cachefile to memory
static void cachetable_reader(WORKITEM wi) {
struct cachefile_prefetch_args* cpargs = workitem_arg(wi);
CACHETABLE ct = cpargs->p->cachefile->cachetable;
CACHEFILE cf = cpargs->p->cachefile;
CACHETABLE ct = cf->cachetable;
cachetable_lock(ct);
// TODO: find a way to properly pass some information for read_extraargs
// This is only called in toku_cachefile_prefetch, by putting it on a workqueue
// The problem is described in comments in toku_cachefile_prefetch
cachetable_fetch_pair(
ct,
cpargs->p->cachefile,
......@@ -2464,15 +2283,18 @@ static void cachetable_reader(WORKITEM wi) {
FALSE
);
cachetable_unlock(ct);
bjm_remove_background_job(cf->bjm);
toku_free(cpargs);
}
static void cachetable_partial_reader(WORKITEM wi) {
struct cachefile_partial_prefetch_args *cpargs = workitem_arg(wi);
CACHETABLE ct = cpargs->p->cachefile->cachetable;
CACHEFILE cf = cpargs->p->cachefile;
CACHETABLE ct = cf->cachetable;
cachetable_lock(ct);
do_partial_fetch(ct, cpargs->p->cachefile, cpargs->p, cpargs->pf_callback, cpargs->read_extraargs, FALSE);
cachetable_unlock(ct);
bjm_remove_background_job(cf->bjm);
toku_free(cpargs);
}
......@@ -2485,16 +2307,7 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
BOOL *doing_prefetch)
// Effect: See the documentation for this function in cachetable.h
{
// TODO: Fix prefetching, as part of ticket 3635
// Here is the cachetable's reason why we are not doing prefetching in Maxwell.
// The fetch_callback requires data that is only valid in the caller's thread,
// namely, a struct that the caller allocates that contains information
// on what pieces of the node will be needed. This data is not necessarily
// valid when the prefetch thread gets around to trying to prefetch the node
// If we pass this data to another thread, we need a mechanism for freeing it.
// It may be another callback. That is way too many callbacks that are being used
// Fixing this in a clean, simple way requires some thought.
if (0) printf("%s:%d %"PRId64"\n", __FUNCTION__, __LINE__, key.b);
int r = 0;
if (doing_prefetch) {
*doing_prefetch = FALSE;
}
......@@ -2512,6 +2325,8 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
// if not found then create a pair in the READING state and fetch it
if (p == 0) {
cachetable_prefetches++;
r = bjm_add_background_job(cf->bjm);
assert_zero(r);
p = cachetable_insert_at(
ct,
cf,
......@@ -2545,6 +2360,8 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
BOOL partial_fetch_required = pf_req_callback(p->value_data, read_extraargs);
if (partial_fetch_required) {
r = bjm_add_background_job(cf->bjm);
assert_zero(r);
struct cachefile_partial_prefetch_args *MALLOC(cpargs);
cpargs->p = p;
cpargs->pf_callback = pf_callback;
......@@ -2645,23 +2462,29 @@ void toku_cachetable_verify (CACHETABLE ct) {
cachetable_unlock(ct);
}
static void assert_cachefile_is_flushed_and_removed (CACHETABLE ct, CACHEFILE cf) {
u_int32_t i;
// Check it two ways
// First way: Look through all the hash chains
for (i=0; i<ct->table_size; i++) {
PAIR p;
for (p=ct->table[i]; p; p=p->hash_chain) {
assert(p->cachefile!=cf);
}
}
// Second way: Look through the LRU list.
{
PAIR p;
for (p=ct->clock_head; p; p=p->clock_next) {
assert(p->cachefile!=cf);
}
}
struct pair_flush_for_close{
PAIR p;
BACKGROUND_JOB_MANAGER bjm;
};
static void cachetable_flush_pair_for_close(WORKITEM wi) {
struct pair_flush_for_close *args = workitem_arg(wi);
PAIR p = args->p;
CACHEFILE cf = p->cachefile;
CACHETABLE ct = cf->cachetable;
cachetable_lock(ct);
PAIR_ATTR attr;
cachetable_only_write_locked_data(
ct,
p,
FALSE, // not for a checkpoint, as we assert above
&attr,
FALSE // not a clone
);
p->dirty = CACHETABLE_CLEAN;
cachetable_unlock(ct);
bjm_remove_background_job(args->bjm);
toku_free(args);
}
// Flush (write to disk) all of the pairs that belong to a cachefile (or all pairs if
......@@ -2678,7 +2501,6 @@ static void assert_cachefile_is_flushed_and_removed (CACHETABLE ct, CACHEFILE cf
// it does NOT include this cachefile.
//
static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
unsigned nfound = 0;
//
// Because work on a kibbutz is always done by the client thread,
// and this function assumes that no client thread is doing any work
......@@ -2689,52 +2511,19 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
// no jobs added to the kibbutz. This implies that the only work other
// threads may be doing is work by the writer threads.
//
// Additionally, the cachetable lock is held on entry to this
// function, so the cleaner thread cannot start any new work either.
//
// No other threads (other than kibbutzim and the cleaner thread) do
// background work we care about as the system is today.
//
if (cf) {
assert(cf->n_background_jobs == 0);
}
struct workqueue cq;
workqueue_init(&cq);
// find all of the pairs owned by a cachefile and redirect their completion
// to a completion queue. If an unlocked PAIR is dirty, flush and remove
// the PAIR. Locked PAIRs are on either a reader/writer thread
// and therefore will be placed on the completion queue.
//
// The assumptions above lead to this reasoning. All pairs belonging to
// this cachefile are either:
// - unlocked
// - locked and on a writer thread (or possibly on a checkpoint thread?).
// We find all the pairs owned by the cachefile and do the following:
// - if the PAIR is clean and unlocked, then remove the PAIR
// - if the PAIR is dirty and unlocked, write the PAIR to disk on a writer thread
// - then, wait on all pairs that are on writer threads (includes pairs we just
// placed on the writer thread along with pairs that were on the writer thread
// when the function started).
// - Once the writer thread is done with a PAIR, remove it
//
unsigned i;
unsigned num_pairs = 0;
unsigned list_size = 256;
PAIR *list = NULL;
XMALLOC_N(list_size, list);
//It is not safe to loop through the table (and hash chains) if you can
//release the cachetable lock at any point within.
//Make a list of pairs that belong to this cachefile.
//Add a reference to them.
if (cf == NULL) {
for (i=0; i < ct->table_size; i++) {
PAIR p;
for (p = ct->table[i]; p; p = p->hash_chain) {
if (cf == 0 || p->cachefile==cf) {
ctpair_add_ref(p);
if (num_pairs == list_size) {
list_size *= 2;
XREALLOC_N(list_size, list);
......@@ -2743,10 +2532,10 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
}
}
}
} else {
}
else {
for (struct toku_list *next_pair = cf->pairs_for_cachefile.next; next_pair != &cf->pairs_for_cachefile; next_pair = next_pair->next) {
PAIR p = toku_list_struct(next_pair, struct ctpair, next_for_cachefile);
ctpair_add_ref(p);
if (num_pairs == list_size) {
list_size *= 2;
XREALLOC_N(list_size, list);
......@@ -2754,92 +2543,50 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
list[num_pairs++] = p;
}
}
//Loop through the list.
//It is safe to access the memory (will not have been freed).
//If 'already_removed' is set, then we should release our reference
//and go to the next entry.
// first write out dirty PAIRs
BACKGROUND_JOB_MANAGER bjm = NULL;
bjm_init(&bjm);
for (i=0; i < num_pairs; i++) {
PAIR p = list[i];
if (!p->already_removed) {
assert(cf == 0 || p->cachefile==cf);
nfound++;
p->cq = &cq;
//
// Once again, the assumption is that any PAIR
// is either unlocked or on a writer thread work queue
//
if (!nb_mutex_writers(&p->value_nb_mutex)) {
flush_and_maybe_remove(ct, p);
}
}
ctpair_destroy(p); //Release our reference
}
toku_free(list);
// wait for all of the pairs in the work queue to complete
//
// If it were possible
// for some thread to change the state of the node before passing
// it off here, and a write to disk were necessary, then the code
// below would be wrong.
//
for (i=0; i<nfound; i++) {
cachetable_unlock(ct);
WORKITEM wi = 0;
//This workqueue's mutex is NOT the cachetable lock.
//You must not be holding the cachetable lock during the dequeue.
int r = workqueue_deq(&cq, &wi, 1); assert(r == 0);
//Some other thread owned the lock, but transferred ownership to the thread executing this function
cachetable_lock(ct);
PAIR p = workitem_arg(wi);
p->cq = 0;
// check some assertions.
// A checkpoint should not be running on this cachefile, so
// the checkpoint_pending bit must be FALSE and
// no other thread should be accessing this PAIR
assert(!p->checkpoint_pending);
// we are only thread using the PAIR
assert(nb_mutex_users(&p->value_nb_mutex) == 1);
assert(nb_mutex_users(&p->value_nb_mutex) == 0);
assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
assert(!p->cloned_value_data);
if (p->dirty == CACHETABLE_DIRTY) {
int r = bjm_add_background_job(bjm);
assert_zero(r);
struct pair_flush_for_close *XMALLOC(args);
args->p = p;
args->bjm = bjm;
workitem_init(&p->asyncwork, cachetable_flush_pair_for_close, args);
workqueue_enq(&ct->wq, &p->asyncwork, 0);
// first we remove the PAIR from the cachetable's linked lists
// and hashtable, so we guarantee that no other thread can access
// this PAIR if we release the cachetable lock (which happens in
// cachetable_only_write_locked_data() if the pair is dirty).
cachetable_remove_pair(ct, p);
//
// #5097 found a bug where another thread had a dirty PAIR pinned
// and was trying to run partial eviction. So, when the ownership
// of the lock is transferred here, the PAIR may still be dirty.
// If so, we need to write it to disk.
//
if (p->dirty) {
PAIR_ATTR attr;
cachetable_only_write_locked_data(
ct,
p,
FALSE, // not for a checkpoint, as we assert above
&attr,
FALSE // not a clone
);
}
// now that we are assured that the PAIR has been written to disk,
// we free the PAIR
nb_mutex_unlock(&p->value_nb_mutex); //Release the lock, no one has a pin, per our assumptions above.
cachetable_free_pair(ct, p);
}
workqueue_destroy(&cq);
cachetable_unlock(ct);
bjm_wait_for_jobs_to_finish(bjm);
bjm_destroy(bjm);
cachetable_lock(ct);
// now get rid of everything
for (i=0; i < num_pairs; i++) {
PAIR p = list[i];
assert(nb_mutex_users(&p->value_nb_mutex) == 0);
assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
assert(!p->cloned_value_data);
assert(p->dirty == CACHETABLE_CLEAN);
cachetable_maybe_remove_and_free_pair(ct, p);
}
if (cf) {
assert(toku_list_empty(&cf->pairs_for_cachefile));
cf->is_flushing = false;
} else {
assert_cachefile_is_flushed_and_removed(ct, cf);
bjm_reset(cf->bjm);
}
if ((4 * ct->n_in_table < ct->table_size) && (ct->table_size>4)) {
cachetable_rehash(ct, ct->table_size/2);
}
}
toku_free(list);
}
/* Requires that no locks be held that are used by the checkpoint logic */
......@@ -2877,8 +2624,7 @@ toku_cachetable_close (CACHETABLE *ctp) {
toku_destroy_workers(&ct->wq, &ct->threadpool);
toku_destroy_workers(&ct->checkpoint_wq, &ct->checkpoint_threadpool);
toku_kibbutz_destroy(ct->kibbutz);
toku_mutex_destroy(&ct->cachefiles_mutex);
toku_cond_destroy(&ct->clones_background_wait);
bjm_destroy(ct->checkpoint_clones_bjm);
toku_free(ct->table);
toku_free(ct->env_dir);
toku_free(ct);
......@@ -3144,11 +2890,9 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
{
CACHEFILE cf;
assert(ct->cachefiles_in_checkpoint==NULL);
cachefiles_lock(ct);
for (cf = ct->cachefiles; cf; cf=cf->next) {
// The caller must serialize open, close, and begin checkpoint.
// So we should never see a closing cachefile here.
assert(!cf->is_closing);
// putting this check so that this function may be called
// by cachetable tests
......@@ -3160,7 +2904,6 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
ct->cachefiles_in_checkpoint = cf;
cf->for_checkpoint = TRUE;
}
cachefiles_unlock(ct);
}
if (logger) {
......@@ -3177,7 +2920,6 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
{
//Must loop through ALL open files (even if not included in checkpoint).
CACHEFILE cf;
cachefiles_lock(ct);
for (cf = ct->cachefiles; cf; cf=cf->next) {
if (cf->log_fassociate_during_checkpoint) {
int r = cf->log_fassociate_during_checkpoint(cf, cf->userdata);
......@@ -3185,7 +2927,6 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
assert(r==0);
}
}
cachefiles_unlock(ct);
}
// Log all the open transactions MUST BE AFTER OPEN FILES
{
......@@ -3200,14 +2941,12 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
{
//Must loop through ALL open files (even if not included in checkpoint).
CACHEFILE cf;
cachefiles_lock(ct);
for (cf = ct->cachefiles; cf; cf=cf->next) {
if (cf->log_suppress_rollback_during_checkpoint) {
int r = cf->log_suppress_rollback_during_checkpoint(cf, cf->userdata);
assert(r==0);
}
}
cachefiles_unlock(ct);
}
}
......@@ -3255,7 +2994,7 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
rwlock_write_lock(&ct->pending_lock, ct->mutex);
ct->checkpoint_is_beginning = TRUE; // detect threadsafety bugs, must set checkpoint_is_beginning ...
invariant(ct->checkpoint_prohibited == 0); // ... before testing checkpoint_prohibited
invariant(ct->n_checkpoint_clones_running == 0);
bjm_reset(ct->checkpoint_clones_bjm);
for (i=0; i < ct->table_size; i++) {
PAIR p;
for (p = ct->table[i]; p; p=p->hash_chain) {
......@@ -3291,7 +3030,6 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
//Once marked as pending, we own write locks on the pairs, which means the writer threads can't conflict.
{
CACHEFILE cf;
cachefiles_lock(ct);
for (cf = ct->cachefiles_in_checkpoint; cf; cf=cf->next_in_checkpoint) {
if (cf->begin_checkpoint_userdata) {
assert(cf->checkpoint_state == CS_NOT_IN_PROGRESS);
......@@ -3300,7 +3038,6 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
cf->checkpoint_state = CS_CALLED_BEGIN_CHECKPOINT;
}
}
cachefiles_unlock(ct);
}
ct->checkpoint_is_beginning = FALSE; // clear before releasing cachetable lock
cachetable_unlock(ct);
......@@ -3338,11 +3075,10 @@ toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger,
// Don't need to unlock and lock cachetable, because the cachetable was unlocked and locked while the flush callback ran.
}
}
assert(!ct->pending_head);
while (ct->n_checkpoint_clones_running > 0) {
toku_cond_wait(&ct->clones_background_wait, ct->mutex);
}
assert(ct->n_checkpoint_clones_running == 0);
assert(!ct->pending_head);
cachetable_unlock(ct);
bjm_wait_for_jobs_to_finish(ct->checkpoint_clones_bjm);
cachetable_lock(ct);
{ // have just written data blocks, so next write the translation and header for each open dictionary
......@@ -3678,7 +3414,7 @@ toku_cleaner_thread (void *cachetable_v)
// - this is how a thread that is calling unpin_and_remove will prevent
// the cleaner thread from picking its PAIR (see comments in that function)
do {
if (nb_mutex_users(&ct->cleaner_head->value_nb_mutex) > 0 || ct->cleaner_head->cachefile->is_flushing) {
if (nb_mutex_users(&ct->cleaner_head->value_nb_mutex) > 0) {
goto next_pair;
}
n_seen++;
......@@ -3695,25 +3431,23 @@ toku_cleaner_thread (void *cachetable_v)
// that is, best_pair != NULL, we do the clean
//
if (best_pair) {
CACHEFILE cf = best_pair->cachefile;
// try to add a background job to the manager
// if we can't, that means the cachefile is flushing, so
// we simply continue the for loop and this iteration
// becomes a no-op
int abj_ret = bjm_add_background_job(cf->bjm);
if (abj_ret) {
cachetable_unlock(ct);
continue;
}
nb_mutex_lock(&best_pair->value_nb_mutex, ct->mutex);
// verify a key assumption.
assert(cleaner_thread_rate_pair(best_pair) > 0);
// the order of operations for these two pieces is important
// we must add the background job first, while we still have the
// cachetable lock and we are assured that the best_pair's
// cachefile is not flushing. Once we add the background
// job, we know that flushing a cachefile will wait on
// this background job to be completed.
// If we were to add the background job after
// writing a PAIR for checkpoint, then we risk
// releasing the cachetable lock during the write
// and allowing a cachefile flush to sneak in
add_background_job(best_pair->cachefile, true);
if (best_pair->checkpoint_pending) {
write_locked_pair_for_checkpoint(ct, best_pair);
}
CACHEFILE cf = best_pair->cachefile;
BOOL cleaner_callback_called = FALSE;
// it's theoretically possible that after writing a PAIR for checkpoint, the
......@@ -3742,7 +3476,7 @@ toku_cleaner_thread (void *cachetable_v)
// "add/remove_background_job" business, which means the
// cachefile is still valid here, even though the cleaner
// callback unlocks the pair.
remove_background_job(cf, true);
bjm_remove_background_job(cf->bjm);
cachetable_unlock(ct);
}
else {
......@@ -3764,8 +3498,6 @@ toku_cachetable_helgrind_ignore(void) {
VALGRIND_HG_DISABLE_CHECKING(&cachetable_misstime, sizeof cachetable_misstime);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_puts, sizeof cachetable_puts);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_prefetches, sizeof cachetable_prefetches);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_maybe_get_and_pins, sizeof cachetable_maybe_get_and_pins);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_maybe_get_and_pin_hits, sizeof cachetable_maybe_get_and_pin_hits);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_evictions, sizeof cachetable_evictions);
VALGRIND_HG_DISABLE_CHECKING(&cleaner_executions, sizeof cleaner_executions);
VALGRIND_HG_DISABLE_CHECKING(&ct_status, sizeof ct_status);
......
......@@ -74,9 +74,6 @@ int toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger,
// Requires no locks be held that are taken by the checkpoint function
void toku_cachetable_minicron_shutdown(CACHETABLE ct);
// Wait for the cachefile's background work to finish.
void toku_cachefile_wait_for_background_work_to_quiesce(CACHEFILE cf);
// Close the cachetable.
// Effects: All of the memory objects are flushed to disk, and the cachetable is destroyed.
int toku_cachetable_close (CACHETABLE*); /* Flushes everything to disk, and destroys the cachetable. */
......@@ -477,8 +474,6 @@ typedef enum {
CT_MISSTIME, // how many usec spent waiting for disk read because of cache miss
CT_PUTS, // how many times has a newly created node been put into the cachetable?
CT_PREFETCHES, // how many times has a block been prefetched into the cachetable?
CT_MAYBE_GET_AND_PINS, // how many times has maybe_get_and_pin(_clean) been called?
CT_MAYBE_GET_AND_PIN_HITS, // how many times has maybe_get_and_pin(_clean) returned with a node?
CT_SIZE_CURRENT, // the sum of the sizes of the nodes represented in the cachetable
CT_SIZE_LIMIT, // the limit to the sum of the node sizes
CT_SIZE_MAX, // high water mark of size_current (max value size_current ever had)
......@@ -506,12 +501,9 @@ char * toku_construct_full_name(int count, ...);
char * toku_cachetable_get_fname_in_cwd(CACHETABLE ct, const char * fname_in_env);
void cachefile_kibbutz_enq (CACHEFILE cf, void (*f)(void*), void *extra);
// Effect: Add a job to the cachetable's collection of work to do. Note that function f must call remove_background_job()
// Effect: Add a job to the cachetable's collection of work to do. Note that function f must call remove_background_job_from_cf()
void add_background_job (CACHEFILE cf, bool already_locked);
// Effect: When a kibbutz job or cleaner thread starts working, the
// cachefile must be notified (so during a close it can wait);
void remove_background_job (CACHEFILE cf, bool already_locked);
void remove_background_job_from_cf (CACHEFILE cf);
// Effect: When a kibbutz job or cleaner thread finishes in a cachefile,
// the cachetable must be notified.
......
......@@ -1758,7 +1758,7 @@ static void flush_node_fun(void *fe_v)
// It is the responsibility of flush_some_child to unlock the node
flush_some_child(fe->h, fe->node, &fa);
}
remove_background_job(fe->h->cf, false);
remove_background_job_from_cf(fe->h->cf);
toku_free(fe);
}
......
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ifndef KIBBUTZ_H
#define KIBBUTZ_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
......
......@@ -15,9 +15,12 @@ static void kibbutz_work(void *fe_v)
CACHEFILE f1 = fe_v;
sleep(2);
foo = TRUE;
int r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
// note that we make the size 16 to induce an eviction
// once evictions are moved to their own thread, we need
// to modify this test
int r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(16));
assert(r==0);
remove_background_job(f1, false);
remove_background_job_from_cf(f1);
}
......
......@@ -52,7 +52,7 @@ static void kibbutz_work(void *fe_v)
foo = TRUE;
int r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r==0);
remove_background_job(f1, false);
remove_background_job_from_cf(f1);
}
......
......@@ -45,7 +45,7 @@ static void kibbutz_work(void *fe_v)
foo = TRUE;
int r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
assert(r==0);
remove_background_job(f1, false);
remove_background_job_from_cf(f1);
}
static void
......
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: test-kibbutz2.c 43762 2012-05-22 16:17:53Z yfogel $"
#ident "Copyright (c) 2011 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "background_job_manager.h"
#include "includes.h"
#include "test.h"
BACKGROUND_JOB_MANAGER bjm;
static void *finish_bjm(void *arg) {
bjm_wait_for_jobs_to_finish(bjm);
return arg;
}
static void bjm_test(void) {
int r = 0;
bjm = NULL;
bjm_init(&bjm);
// test simple add/remove of background job works
r = bjm_add_background_job(bjm);
assert_zero(r);
bjm_remove_background_job(bjm);
bjm_wait_for_jobs_to_finish(bjm);
// assert that you cannot add a background job
// without resetting bjm after waiting
// for finish
r = bjm_add_background_job(bjm);
assert(r != 0);
// test that after a reset, we can resume adding background jobs
bjm_reset(bjm);
r = bjm_add_background_job(bjm);
assert_zero(r);
bjm_remove_background_job(bjm);
bjm_wait_for_jobs_to_finish(bjm);
bjm_reset(bjm);
r = bjm_add_background_job(bjm);
assert_zero(r);
toku_pthread_t tid;
r = toku_pthread_create(&tid, NULL, finish_bjm, NULL);
assert_zero(r);
usleep(2*1024*1024);
// should return non-zero because tid is waiting
// for background jobs to finish
r = bjm_add_background_job(bjm);
assert(r != 0);
bjm_remove_background_job(bjm);
void *ret;
r = toku_pthread_join(tid, &ret);
assert_zero(r);
bjm_destroy(bjm);
}
int
test_main (int argc , const char *argv[]) {
default_parse_args(argc, argv);
bjm_test();
if (verbose) printf("test ok\n");
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment