Commit dfdaee29 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Candidate fix for #2730 (the race in the loader and the io-thread where they...

Candidate fix for #2730 (the race in the loader and the io-thread where they glare at each other).  Refs #2730. [t:2730]

git-svn-id: file:///svn/toku/tokudb@21082 c7de825b-a66e-492c-adef-691d508d4ae1
parent 59540179
......@@ -151,6 +151,8 @@ struct brtloader_s {
int progress; // Progress runs from 0 to PROGRESS_MAX. When we call the poll function we convert to a float from 0.0 to 1.0
// We use an integer so that we can add to the progress using a fetch-and-add instruction.
int progress_callback_result; // initially zero, if any call to the poll function callback returns nonzero, we save the result here (and don't call the poll callback function again).
LSN load_lsn; //LSN of the fsynced 'load' log entry. Write this LSN (as checkpoint_lsn) in brt headers made by this loader.
QUEUE *fractal_queues; // an array of work queues, one for each secondary index.
......
......@@ -443,6 +443,7 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
bl->n_rows = 0;
bl->progress = 0;
bl->progress_callback_result = 0;
MY_CALLOC_N(N, bl->rows);
MY_CALLOC_N(N, bl->fs);
......@@ -1376,8 +1377,16 @@ static int update_progress (int N,
{ int r = toku_pthread_mutex_lock(&update_progress_lock); resource_assert(r == 0); }
bl->progress+=N;
//printf(" %20s: %d ", message, bl->progress);
int result = brt_loader_call_poll_function(&bl->poll_callback, (float)bl->progress/(float)PROGRESS_MAX);
int result;
if (bl->progress_callback_result == 0) {
//printf(" %20s: %d ", message, bl->progress);
result = brt_loader_call_poll_function(&bl->poll_callback, (float)bl->progress/(float)PROGRESS_MAX);
if (result!=0) {
bl->progress_callback_result = result;
}
} else {
result = bl->progress_callback_result;
}
{ int r = toku_pthread_mutex_unlock(&update_progress_lock); resource_assert(r == 0); }
return result;
}
......@@ -1534,12 +1543,11 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
//printf(" n_rows=%ld\n", n_rows);
while (result==0 && pqueue_size(pq)>0) {
int r;
int mini;
{
// get the minimum
pqueue_node_t *node;
r = pqueue_pop(pq, &node);
int r = pqueue_pop(pq, &node);
if (r!=0) {
result = r;
lazy_assert(0);
......@@ -1550,27 +1558,33 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
if (to_q) {
if (row_wont_fit(output_rowset, keys[mini].size + vals[mini].size)) {
BL_TRACE(blt_do_i);
r = queue_enq(q, (void*)output_rowset, 1, NULL);
BL_TRACE(blt_fractal_enq);
{
int r = queue_enq(q, (void*)output_rowset, 1, NULL);
BL_TRACE(blt_fractal_enq);
if (r!=0) {
result = r;
break;
}
}
XMALLOC(output_rowset); // freed in cleanup
{
int r = init_rowset(output_rowset, memory_per_rowset(bl));
if (r!=0) {
result = r;
break;
}
}
}
{
int r = add_row(output_rowset, &keys[mini], &vals[mini]);
if (r!=0) {
result = r;
break;
}
XMALLOC(output_rowset); // freed in cleanup
r = init_rowset(output_rowset, memory_per_rowset(bl));
if (r!=0) {
result = r;
break;
}
}
r = add_row(output_rowset, &keys[mini], &vals[mini]);
if (r!=0) {
result = r;
break;
}
} else {
// write it to the dest file
r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], bl);
int r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], bl);
if (r!=0) {
result = r;
break;
......@@ -1580,7 +1594,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
{
// read next row from file that just sourced min value
BL_TRACE_QUIET(blt_do_i);
r = loader_read_row_from_dbufio(bfs, mini, &keys[mini], &vals[mini]);
int r = loader_read_row_from_dbufio(bfs, mini, &keys[mini], &vals[mini]);
BL_TRACE_QUIET(blt_read_row);
if (r!=0) {
if (r==EOF) {
......@@ -1592,8 +1606,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
result = r;
break;
}
}
else {
} else {
// insert value into queue (re-populate queue)
pq_nodes[mini].key = &keys[mini];
r = pqueue_insert(pq, &pq_nodes[mini]);
......@@ -1614,13 +1627,9 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
invariant(0<= fraction_of_remaining_we_just_did && fraction_of_remaining_we_just_did<=1);
int progress_just_done = fraction_of_remaining_we_just_did * progress_allocation;
progress_allocation -= progress_just_done;
r = update_progress(progress_just_done, bl, "in file merge");
//printf("%s:%d Progress=%d\n", __FILE__, __LINE__, r);
if (r!=0) {
invariant(result==0);
result=r;
break;
}
// ignore the result from update_progress here, we'll call update_progress again below, which will give us the nonzero result.
int r = update_progress(progress_just_done, bl, "in file merge");
if (0) printf("%s:%d Progress=%d\n", __FILE__, __LINE__, r);
}
}
if (result==0 && to_q) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment