Commit ece1d4c4 authored by John Esmet's avatar John Esmet

FT-245 Move queue to util, use toku_ prefix for consistency with the

rest of the code base
parent 39397aab
...@@ -54,7 +54,6 @@ set(FT_SOURCES ...@@ -54,7 +54,6 @@ set(FT_SOURCES
log_upgrade log_upgrade
minicron minicron
pqueue pqueue
queue
quicklz quicklz
recover recover
rollback rollback
......
...@@ -93,7 +93,7 @@ PATENT RIGHTS GRANT: ...@@ -93,7 +93,7 @@ PATENT RIGHTS GRANT:
#include <db.h> #include <db.h>
#include "fttypes.h" #include "fttypes.h"
#include "ftloader.h" #include "ftloader.h"
#include "queue.h" #include "util/queue.h"
#include <toku_pthread.h> #include <toku_pthread.h>
#include "dbufio.h" #include "dbufio.h"
......
...@@ -423,7 +423,7 @@ void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error) { ...@@ -423,7 +423,7 @@ void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error) {
destroy_rowset(&bl->primary_rowset); destroy_rowset(&bl->primary_rowset);
if (bl->primary_rowset_queue) { if (bl->primary_rowset_queue) {
queue_destroy(bl->primary_rowset_queue); toku_queue_destroy(bl->primary_rowset_queue);
bl->primary_rowset_queue = nullptr; bl->primary_rowset_queue = nullptr;
} }
...@@ -629,7 +629,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, ...@@ -629,7 +629,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
int r = init_rowset(&bl->primary_rowset, memory_per_rowset_during_extract(bl)); int r = init_rowset(&bl->primary_rowset, memory_per_rowset_during_extract(bl));
if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; } if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; }
} }
{ int r = queue_create(&bl->primary_rowset_queue, EXTRACTOR_QUEUE_DEPTH); { int r = toku_queue_create(&bl->primary_rowset_queue, EXTRACTOR_QUEUE_DEPTH);
if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; } if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; }
} }
{ {
...@@ -1138,7 +1138,7 @@ static void* extractor_thread (void *blv) { ...@@ -1138,7 +1138,7 @@ static void* extractor_thread (void *blv) {
while (1) { while (1) {
void *item; void *item;
{ {
int rq = queue_deq(bl->primary_rowset_queue, &item, NULL, NULL); int rq = toku_queue_deq(bl->primary_rowset_queue, &item, NULL, NULL);
if (rq==EOF) break; if (rq==EOF) break;
invariant(rq==0); // other errors are arbitrarily bad. invariant(rq==0); // other errors are arbitrarily bad.
} }
...@@ -1169,7 +1169,7 @@ static void enqueue_for_extraction (FTLOADER bl) { ...@@ -1169,7 +1169,7 @@ static void enqueue_for_extraction (FTLOADER bl) {
struct rowset *XMALLOC(enqueue_me); struct rowset *XMALLOC(enqueue_me);
*enqueue_me = bl->primary_rowset; *enqueue_me = bl->primary_rowset;
zero_rowset(&bl->primary_rowset); zero_rowset(&bl->primary_rowset);
int r = queue_enq(bl->primary_rowset_queue, (void*)enqueue_me, 1, NULL); int r = toku_queue_enq(bl->primary_rowset_queue, (void*)enqueue_me, 1, NULL);
resource_assert_zero(r); resource_assert_zero(r);
} }
...@@ -1206,7 +1206,7 @@ finish_extractor (FTLOADER bl) { ...@@ -1206,7 +1206,7 @@ finish_extractor (FTLOADER bl) {
} }
//printf("%s:%d please finish extraction\n", __FILE__, __LINE__); //printf("%s:%d please finish extraction\n", __FILE__, __LINE__);
{ {
int r = queue_eof(bl->primary_rowset_queue); int r = toku_queue_eof(bl->primary_rowset_queue);
invariant(r==0); invariant(r==0);
} }
//printf("%s:%d joining\n", __FILE__, __LINE__); //printf("%s:%d joining\n", __FILE__, __LINE__);
...@@ -1218,7 +1218,7 @@ finish_extractor (FTLOADER bl) { ...@@ -1218,7 +1218,7 @@ finish_extractor (FTLOADER bl) {
bl->extractor_live = false; bl->extractor_live = false;
} }
{ {
int r = queue_destroy(bl->primary_rowset_queue); int r = toku_queue_destroy(bl->primary_rowset_queue);
invariant(r==0); invariant(r==0);
bl->primary_rowset_queue = nullptr; bl->primary_rowset_queue = nullptr;
} }
...@@ -1882,7 +1882,7 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q ...@@ -1882,7 +1882,7 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q
if (to_q) { if (to_q) {
if (row_wont_fit(output_rowset, keys[mini].size + vals[mini].size)) { if (row_wont_fit(output_rowset, keys[mini].size + vals[mini].size)) {
{ {
int r = queue_enq(q, (void*)output_rowset, 1, NULL); int r = toku_queue_enq(q, (void*)output_rowset, 1, NULL);
if (r!=0) { if (r!=0) {
result = r; result = r;
break; break;
...@@ -1958,7 +1958,7 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q ...@@ -1958,7 +1958,7 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q
} }
if (result==0 && to_q) { if (result==0 && to_q) {
int r = queue_enq(q, (void*)output_rowset, 1, NULL); int r = toku_queue_enq(q, (void*)output_rowset, 1, NULL);
if (r!=0) if (r!=0)
result = r; result = r;
else else
...@@ -2149,7 +2149,7 @@ int merge_files (struct merge_fileset *fs, ...@@ -2149,7 +2149,7 @@ int merge_files (struct merge_fileset *fs,
if (result) ft_loader_set_panic(bl, result, true, which_db, nullptr, nullptr); if (result) ft_loader_set_panic(bl, result, true, which_db, nullptr, nullptr);
{ {
int r = queue_eof(output_q); int r = toku_queue_eof(output_q);
if (r!=0 && result==0) result = r; if (r!=0 && result==0) result = r;
} }
// It's conceivable that the progress_allocation could be nonzero (for example if bl->N==0) // It's conceivable that the progress_allocation could be nonzero (for example if bl->N==0)
...@@ -2371,7 +2371,7 @@ static int write_header (struct dbout *out, long long translation_location_on_di ...@@ -2371,7 +2371,7 @@ static int write_header (struct dbout *out, long long translation_location_on_di
static void drain_writer_q(QUEUE q) { static void drain_writer_q(QUEUE q) {
void *item; void *item;
while (1) { while (1) {
int r = queue_deq(q, &item, NULL, NULL); int r = toku_queue_deq(q, &item, NULL, NULL);
if (r == EOF) if (r == EOF)
break; break;
invariant(r == 0); invariant(r == 0);
...@@ -2501,7 +2501,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, ...@@ -2501,7 +2501,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
while (result == 0) { while (result == 0) {
void *item; void *item;
{ {
int rr = queue_deq(q, &item, NULL, NULL); int rr = toku_queue_deq(q, &item, NULL, NULL);
if (rr == EOF) break; if (rr == EOF) break;
if (rr != 0) { if (rr != 0) {
ft_loader_set_panic(bl, rr, true, which_db, nullptr, nullptr); ft_loader_set_panic(bl, rr, true, which_db, nullptr, nullptr);
...@@ -2723,7 +2723,7 @@ static int loader_do_i (FTLOADER bl, ...@@ -2723,7 +2723,7 @@ static int loader_do_i (FTLOADER bl,
struct rowset *rows = &(bl->rows[which_db]); struct rowset *rows = &(bl->rows[which_db]);
invariant(rows->data==NULL); // the rows should be all cleaned up already invariant(rows->data==NULL); // the rows should be all cleaned up already
int r = queue_create(&bl->fractal_queues[which_db], FRACTAL_WRITER_QUEUE_DEPTH); int r = toku_queue_create(&bl->fractal_queues[which_db], FRACTAL_WRITER_QUEUE_DEPTH);
if (r) goto error; if (r) goto error;
{ {
...@@ -2767,7 +2767,7 @@ static int loader_do_i (FTLOADER bl, ...@@ -2767,7 +2767,7 @@ static int loader_do_i (FTLOADER bl,
r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta); r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta);
if (r) { if (r) {
int r2 __attribute__((__unused__)) = queue_destroy(bl->fractal_queues[which_db]); int r2 __attribute__((__unused__)) = toku_queue_destroy(bl->fractal_queues[which_db]);
// ignore r2, since we already have an error // ignore r2, since we already have an error
bl->fractal_queues[which_db] = nullptr; bl->fractal_queues[which_db] = nullptr;
goto error; goto error;
...@@ -2788,7 +2788,7 @@ static int loader_do_i (FTLOADER bl, ...@@ -2788,7 +2788,7 @@ static int loader_do_i (FTLOADER bl,
if (r == 0) r = fta.errno_result; if (r == 0) r = fta.errno_result;
} }
} else { } else {
queue_eof(bl->fractal_queues[which_db]); toku_queue_eof(bl->fractal_queues[which_db]);
r = toku_loader_write_ft_from_q(bl, descriptor, fd, progress_allocation, r = toku_loader_write_ft_from_q(bl, descriptor, fd, progress_allocation,
bl->fractal_queues[which_db], bl->extracted_datasizes[which_db], which_db, bl->fractal_queues[which_db], bl->extracted_datasizes[which_db], which_db,
target_nodesize, target_basementnodesize, target_compression_method, target_fanout); target_nodesize, target_basementnodesize, target_compression_method, target_fanout);
...@@ -2797,7 +2797,7 @@ static int loader_do_i (FTLOADER bl, ...@@ -2797,7 +2797,7 @@ static int loader_do_i (FTLOADER bl,
error: // this is the cleanup code. Even if r==0 (no error) we fall through to here. error: // this is the cleanup code. Even if r==0 (no error) we fall through to here.
if (bl->fractal_queues[which_db]) { if (bl->fractal_queues[which_db]) {
int r2 = queue_destroy(bl->fractal_queues[which_db]); int r2 = toku_queue_destroy(bl->fractal_queues[which_db]);
invariant(r2==0); invariant(r2==0);
bl->fractal_queues[which_db] = nullptr; bl->fractal_queues[which_db] = nullptr;
} }
......
...@@ -183,7 +183,7 @@ static void test_extractor(int nrows, int nrowsets, bool expect_fail) { ...@@ -183,7 +183,7 @@ static void test_extractor(int nrows, int nrowsets, bool expect_fail) {
// feed rowsets to the extractor // feed rowsets to the extractor
for (int i = 0; i < nrowsets; i++) { for (int i = 0; i < nrowsets; i++) {
r = queue_enq(loader->primary_rowset_queue, rowset[i], 1, NULL); r = toku_queue_enq(loader->primary_rowset_queue, rowset[i], 1, NULL);
assert(r == 0); assert(r == 0);
} }
......
...@@ -201,7 +201,7 @@ static void test_extractor(int nrows, int nrowsets, bool expect_fail, const char ...@@ -201,7 +201,7 @@ static void test_extractor(int nrows, int nrowsets, bool expect_fail, const char
// feed rowsets to the extractor // feed rowsets to the extractor
for (int i = 0; i < nrowsets; i++) { for (int i = 0; i < nrowsets; i++) {
r = queue_enq(loader->primary_rowset_queue, rowset[i], 1, NULL); r = toku_queue_enq(loader->primary_rowset_queue, rowset[i], 1, NULL);
assert(r == 0); assert(r == 0);
} }
......
...@@ -415,7 +415,7 @@ static void test_extractor(int nrows, int nrowsets, const char *testdir) { ...@@ -415,7 +415,7 @@ static void test_extractor(int nrows, int nrowsets, const char *testdir) {
// feed rowsets to the extractor // feed rowsets to the extractor
for (int i = 0; i < nrowsets; i++) { for (int i = 0; i < nrowsets; i++) {
r = queue_enq(loader->primary_rowset_queue, rowset[i], 1, NULL); r = toku_queue_enq(loader->primary_rowset_queue, rowset[i], 1, NULL);
assert(r == 0); assert(r == 0);
} }
r = toku_ft_loader_finish_extractor(loader); r = toku_ft_loader_finish_extractor(loader);
......
...@@ -346,7 +346,7 @@ static void *consumer_thread (void *ctv) { ...@@ -346,7 +346,7 @@ static void *consumer_thread (void *ctv) {
struct consumer_thunk *cthunk = (struct consumer_thunk *)ctv; struct consumer_thunk *cthunk = (struct consumer_thunk *)ctv;
while (1) { while (1) {
void *item; void *item;
int r = queue_deq(cthunk->q, &item, NULL, NULL); int r = toku_queue_deq(cthunk->q, &item, NULL, NULL);
if (r==EOF) return NULL; if (r==EOF) return NULL;
assert(r==0); assert(r==0);
struct rowset *rowset = (struct rowset *)item; struct rowset *rowset = (struct rowset *)item;
...@@ -423,7 +423,7 @@ static void test (const char *directory, bool is_error) { ...@@ -423,7 +423,7 @@ static void test (const char *directory, bool is_error) {
ft_loader_set_fractal_workers_count_from_c(bl); ft_loader_set_fractal_workers_count_from_c(bl);
QUEUE q; QUEUE q;
{ int r = queue_create(&q, 1000); assert(r==0); } { int r = toku_queue_create(&q, 1000); assert(r==0); }
DBUFIO_FILESET bfs; DBUFIO_FILESET bfs;
const int MERGE_BUF_SIZE = 100000; // bigger than 64K so that we will trigger malloc issues. const int MERGE_BUF_SIZE = 100000; // bigger than 64K so that we will trigger malloc issues.
{ int r = create_dbufio_fileset(&bfs, N_SOURCES, fds, MERGE_BUF_SIZE, false); assert(r==0); } { int r = create_dbufio_fileset(&bfs, N_SOURCES, fds, MERGE_BUF_SIZE, false); assert(r==0); }
...@@ -474,7 +474,7 @@ static void test (const char *directory, bool is_error) { ...@@ -474,7 +474,7 @@ static void test (const char *directory, bool is_error) {
panic_dbufio_fileset(bfs, r); panic_dbufio_fileset(bfs, r);
} }
{ {
int r = queue_eof(q); int r = toku_queue_eof(q);
assert(r==0); assert(r==0);
} }
...@@ -501,7 +501,7 @@ static void test (const char *directory, bool is_error) { ...@@ -501,7 +501,7 @@ static void test (const char *directory, bool is_error) {
} }
} }
{ {
int r = queue_destroy(q); int r = toku_queue_destroy(q);
assert(r==0); assert(r==0);
} }
toku_ft_loader_internal_destroy(bl, false); toku_ft_loader_internal_destroy(bl, false);
......
...@@ -159,20 +159,20 @@ static int write_dbfile (char *tf_template, int n, char *output_name, bool expec ...@@ -159,20 +159,20 @@ static int write_dbfile (char *tf_template, int n, char *output_name, bool expec
ft_loader_fi_close_all(&bl.file_infos); ft_loader_fi_close_all(&bl.file_infos);
QUEUE q; QUEUE q;
r = queue_create(&q, 0xFFFFFFFF); // infinite queue. r = toku_queue_create(&q, 0xFFFFFFFF); // infinite queue.
assert(r==0); assert(r==0);
r = merge_files(&fs, &bl, 0, dest_db, compare_ints, 0, q); CKERR(r); r = merge_files(&fs, &bl, 0, dest_db, compare_ints, 0, q); CKERR(r);
assert(fs.n_temp_files==0); assert(fs.n_temp_files==0);
QUEUE q2; QUEUE q2;
r = queue_create(&q2, 0xFFFFFFFF); // infinite queue. r = toku_queue_create(&q2, 0xFFFFFFFF); // infinite queue.
assert(r==0); assert(r==0);
size_t num_found = 0; size_t num_found = 0;
size_t found_size_est = 0; size_t found_size_est = 0;
while (1) { while (1) {
void *v; void *v;
r = queue_deq(q, &v, NULL, NULL); r = toku_queue_deq(q, &v, NULL, NULL);
if (r==EOF) break; if (r==EOF) break;
struct rowset *rs = (struct rowset *)v; struct rowset *rs = (struct rowset *)v;
if (verbose) printf("v=%p\n", v); if (verbose) printf("v=%p\n", v);
...@@ -187,16 +187,16 @@ static int write_dbfile (char *tf_template, int n, char *output_name, bool expec ...@@ -187,16 +187,16 @@ static int write_dbfile (char *tf_template, int n, char *output_name, bool expec
num_found += rs->n_rows; num_found += rs->n_rows;
r = queue_enq(q2, v, 0, NULL); r = toku_queue_enq(q2, v, 0, NULL);
assert(r==0); assert(r==0);
} }
assert((int)num_found == n); assert((int)num_found == n);
if (!expect_error) assert(found_size_est == size_est); if (!expect_error) assert(found_size_est == size_est);
r = queue_eof(q2); r = toku_queue_eof(q2);
assert(r==0); assert(r==0);
r = queue_destroy(q); r = toku_queue_destroy(q);
assert(r==0); assert(r==0);
DESCRIPTOR_S desc; DESCRIPTOR_S desc;
...@@ -225,7 +225,7 @@ static int write_dbfile (char *tf_template, int n, char *output_name, bool expec ...@@ -225,7 +225,7 @@ static int write_dbfile (char *tf_template, int n, char *output_name, bool expec
ft_loader_destroy_poll_callback(&bl.poll_callback); ft_loader_destroy_poll_callback(&bl.poll_callback);
ft_loader_lock_destroy(&bl); ft_loader_lock_destroy(&bl);
r = queue_destroy(q2); r = toku_queue_destroy(q2);
assert(r==0); assert(r==0);
destroy_merge_fileset(&fs); destroy_merge_fileset(&fs);
......
...@@ -215,20 +215,20 @@ static void test_write_dbfile (char *tf_template, int n, char *output_name, TXNI ...@@ -215,20 +215,20 @@ static void test_write_dbfile (char *tf_template, int n, char *output_name, TXNI
ft_loader_fi_close_all(&bl.file_infos); ft_loader_fi_close_all(&bl.file_infos);
QUEUE q; QUEUE q;
r = queue_create(&q, 0xFFFFFFFF); // infinite queue. r = toku_queue_create(&q, 0xFFFFFFFF); // infinite queue.
assert(r==0); assert(r==0);
r = merge_files(&fs, &bl, 0, dest_db, compare_ints, 0, q); CKERR(r); r = merge_files(&fs, &bl, 0, dest_db, compare_ints, 0, q); CKERR(r);
assert(fs.n_temp_files==0); assert(fs.n_temp_files==0);
QUEUE q2; QUEUE q2;
r = queue_create(&q2, 0xFFFFFFFF); // infinite queue. r = toku_queue_create(&q2, 0xFFFFFFFF); // infinite queue.
assert(r==0); assert(r==0);
size_t num_found = 0; size_t num_found = 0;
size_t found_size_est = 0; size_t found_size_est = 0;
while (1) { while (1) {
void *v; void *v;
r = queue_deq(q, &v, NULL, NULL); r = toku_queue_deq(q, &v, NULL, NULL);
if (r==EOF) break; if (r==EOF) break;
struct rowset *rs = (struct rowset *)v; struct rowset *rs = (struct rowset *)v;
if (verbose) printf("v=%p\n", v); if (verbose) printf("v=%p\n", v);
...@@ -243,16 +243,16 @@ static void test_write_dbfile (char *tf_template, int n, char *output_name, TXNI ...@@ -243,16 +243,16 @@ static void test_write_dbfile (char *tf_template, int n, char *output_name, TXNI
num_found += rs->n_rows; num_found += rs->n_rows;
r = queue_enq(q2, v, 0, NULL); r = toku_queue_enq(q2, v, 0, NULL);
assert(r==0); assert(r==0);
} }
assert((int)num_found == n); assert((int)num_found == n);
assert(found_size_est == size_est); assert(found_size_est == size_est);
r = queue_eof(q2); r = toku_queue_eof(q2);
assert(r==0); assert(r==0);
r = queue_destroy(q); r = toku_queue_destroy(q);
assert(r==0); assert(r==0);
DESCRIPTOR_S desc; DESCRIPTOR_S desc;
...@@ -265,7 +265,7 @@ static void test_write_dbfile (char *tf_template, int n, char *output_name, TXNI ...@@ -265,7 +265,7 @@ static void test_write_dbfile (char *tf_template, int n, char *output_name, TXNI
r = toku_loader_write_ft_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est, 0, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, 16); r = toku_loader_write_ft_from_q_in_C(&bl, &desc, fd, 1000, q2, size_est, 0, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, 16);
assert(r==0); assert(r==0);
r = queue_destroy(q2); r = toku_queue_destroy(q2);
assert_zero(r); assert_zero(r);
destroy_merge_fileset(&fs); destroy_merge_fileset(&fs);
......
...@@ -412,7 +412,7 @@ static void test_merge_files (const char *tf_template, const char *output_name) ...@@ -412,7 +412,7 @@ static void test_merge_files (const char *tf_template, const char *output_name)
ft_loader_fi_close_all(&bl.file_infos); ft_loader_fi_close_all(&bl.file_infos);
QUEUE q; QUEUE q;
r = queue_create(&q, 0xFFFFFFFF); // infinite queue. r = toku_queue_create(&q, 0xFFFFFFFF); // infinite queue.
assert(r==0); assert(r==0);
r = merge_files(&fs, &bl, 0, dest_db, compare_ints, 0, q); CKERR(r); r = merge_files(&fs, &bl, 0, dest_db, compare_ints, 0, q); CKERR(r);
...@@ -436,7 +436,7 @@ static void test_merge_files (const char *tf_template, const char *output_name) ...@@ -436,7 +436,7 @@ static void test_merge_files (const char *tf_template, const char *output_name)
// verify the dbfile // verify the dbfile
verify_dbfile(10, sorted_keys, sorted_vals, output_name); verify_dbfile(10, sorted_keys, sorted_vals, output_name);
r = queue_destroy(q); r = toku_queue_destroy(q);
assert(r==0); assert(r==0);
} }
......
...@@ -5,6 +5,7 @@ set(util_srcs ...@@ -5,6 +5,7 @@ set(util_srcs
memarena memarena
mempool mempool
partitioned_counter partitioned_counter
queue
threadpool threadpool
scoped_malloc scoped_malloc
x1764 x1764
......
...@@ -128,7 +128,7 @@ struct queue { ...@@ -128,7 +128,7 @@ struct queue {
// q->mutex and q->cond are used as condition variables. // q->mutex and q->cond are used as condition variables.
int queue_create (QUEUE *q, uint64_t weight_limit) int toku_queue_create (QUEUE *q, uint64_t weight_limit)
{ {
QUEUE CALLOC(result); QUEUE CALLOC(result);
if (result==NULL) return get_error_errno(); if (result==NULL) return get_error_errno();
...@@ -143,7 +143,7 @@ int queue_create (QUEUE *q, uint64_t weight_limit) ...@@ -143,7 +143,7 @@ int queue_create (QUEUE *q, uint64_t weight_limit)
return 0; return 0;
} }
int queue_destroy (QUEUE q) int toku_queue_destroy (QUEUE q)
{ {
if (q->head) return EINVAL; if (q->head) return EINVAL;
assert(q->contents_weight==0); assert(q->contents_weight==0);
...@@ -153,7 +153,7 @@ int queue_destroy (QUEUE q) ...@@ -153,7 +153,7 @@ int queue_destroy (QUEUE q)
return 0; return 0;
} }
int queue_enq (QUEUE q, void *item, uint64_t weight, uint64_t *total_weight_after_enq) int toku_queue_enq (QUEUE q, void *item, uint64_t weight, uint64_t *total_weight_after_enq)
{ {
toku_mutex_lock(&q->mutex); toku_mutex_lock(&q->mutex);
assert(!q->eof); assert(!q->eof);
...@@ -189,7 +189,7 @@ int queue_enq (QUEUE q, void *item, uint64_t weight, uint64_t *total_weight_afte ...@@ -189,7 +189,7 @@ int queue_enq (QUEUE q, void *item, uint64_t weight, uint64_t *total_weight_afte
return 0; return 0;
} }
int queue_eof (QUEUE q) int toku_queue_eof (QUEUE q)
{ {
toku_mutex_lock(&q->mutex); toku_mutex_lock(&q->mutex);
assert(!q->eof); assert(!q->eof);
...@@ -199,7 +199,7 @@ int queue_eof (QUEUE q) ...@@ -199,7 +199,7 @@ int queue_eof (QUEUE q)
return 0; return 0;
} }
int queue_deq (QUEUE q, void **item, uint64_t *weight, uint64_t *total_weight_after_deq) int toku_queue_deq (QUEUE q, void **item, uint64_t *weight, uint64_t *total_weight_after_deq)
{ {
toku_mutex_lock(&q->mutex); toku_mutex_lock(&q->mutex);
int result; int result;
......
...@@ -92,8 +92,6 @@ PATENT RIGHTS GRANT: ...@@ -92,8 +92,6 @@ PATENT RIGHTS GRANT:
#ident "Copyright (c) 2007-2013 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007-2013 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "fttypes.h"
// The abstraction: // The abstraction:
// //
// queue.h implements a queue suitable for a producer-consumer relationship between two pthreads. // queue.h implements a queue suitable for a producer-consumer relationship between two pthreads.
...@@ -110,21 +108,21 @@ PATENT RIGHTS GRANT: ...@@ -110,21 +108,21 @@ PATENT RIGHTS GRANT:
typedef struct queue *QUEUE; typedef struct queue *QUEUE;
int queue_create (QUEUE *q, uint64_t weight_limit); int toku_queue_create (QUEUE *q, uint64_t weight_limit);
// Effect: Create a queue with a given weight limit. The queue is initially empty. // Effect: Create a queue with a given weight limit. The queue is initially empty.
int queue_enq (QUEUE q, void *item, uint64_t weight, uint64_t *total_weight_after_enq); int toku_queue_enq (QUEUE q, void *item, uint64_t weight, uint64_t *total_weight_after_enq);
// Effect: Insert ITEM of weight WEIGHT into queue. If the resulting contents weight too much then block (don't return) until the total weight is low enough. // Effect: Insert ITEM of weight WEIGHT into queue. If the resulting contents weight too much then block (don't return) until the total weight is low enough.
// If total_weight_after_enq!=NULL then return the current weight of the items in the queue (after finishing blocking on overweight, and after enqueueing the item). // If total_weight_after_enq!=NULL then return the current weight of the items in the queue (after finishing blocking on overweight, and after enqueueing the item).
// If successful return 0. // If successful return 0.
// If an error occurs, return the error number, and the state of the queue is undefined. The item may have been enqueued or not, and in fact the queue may be badly corrupted if the condition variables go awry. If it's just a matter of out-of-memory, then the queue is probably OK. // If an error occurs, return the error number, and the state of the queue is undefined. The item may have been enqueued or not, and in fact the queue may be badly corrupted if the condition variables go awry. If it's just a matter of out-of-memory, then the queue is probably OK.
// Requires: There is only a single consumer. (We wake up the consumer using a pthread_cond_signal (which is suitable only for single consumers.) // Requires: There is only a single consumer. (We wake up the consumer using a pthread_cond_signal (which is suitable only for single consumers.)
int queue_eof (QUEUE q); int toku_queue_eof (QUEUE q);
// Effect: Inform the queue that no more values will be inserted. After all the values that have been inserted are dequeued, further dequeue operations will return EOF. // Effect: Inform the queue that no more values will be inserted. After all the values that have been inserted are dequeued, further dequeue operations will return EOF.
// Returns 0 on success. On failure, things are pretty bad (likely to be some sort of mutex failure). // Returns 0 on success. On failure, things are pretty bad (likely to be some sort of mutex failure).
int queue_deq (QUEUE q, void **item, uint64_t *weight, uint64_t *total_weight_after_deq); int toku_queue_deq (QUEUE q, void **item, uint64_t *weight, uint64_t *total_weight_after_deq);
// Effect: Wait until the queue becomes nonempty. Then dequeue and return the oldest item. The item and its weight are returned in *ITEM. // Effect: Wait until the queue becomes nonempty. Then dequeue and return the oldest item. The item and its weight are returned in *ITEM.
// If weight!=NULL then return the item's weight in *weight. // If weight!=NULL then return the item's weight in *weight.
// If total_weight_after_deq!=NULL then return the current weight of the items in the queue (after dequeuing the item). // If total_weight_after_deq!=NULL then return the current weight of the items in the queue (after dequeuing the item).
...@@ -132,7 +130,7 @@ int queue_deq (QUEUE q, void **item, uint64_t *weight, uint64_t *total_weight_af ...@@ -132,7 +130,7 @@ int queue_deq (QUEUE q, void **item, uint64_t *weight, uint64_t *total_weight_af
// Return EOF is we no more items will be returned. // Return EOF is we no more items will be returned.
// Usage note: The queue should be destroyed only after any consumers will no longer look at it (for example, they saw EOF). // Usage note: The queue should be destroyed only after any consumers will no longer look at it (for example, they saw EOF).
int queue_destroy (QUEUE q); int toku_queue_destroy (QUEUE q);
// Effect: Destroy the queue. // Effect: Destroy the queue.
// Requires: The queue must be empty and no consumer should try to dequeue after this (one way to do this is to make sure the consumer saw EOF). // Requires: The queue must be empty and no consumer should try to dequeue after this (one way to do this is to make sure the consumer saw EOF).
// Returns 0 on success. If the queue is not empty, returns EINVAL. Other errors are likely to be bad (some sort of mutex or condvar failure). // Returns 0 on success. If the queue is not empty, returns EINVAL. Other errors are likely to be bad (some sort of mutex or condvar failure).
......
...@@ -94,7 +94,7 @@ PATENT RIGHTS GRANT: ...@@ -94,7 +94,7 @@ PATENT RIGHTS GRANT:
#include <unistd.h> #include <unistd.h>
#include <toku_assert.h> #include <toku_assert.h>
#include <toku_pthread.h> #include <toku_pthread.h>
#include "queue.h" #include "util/queue.h"
static int verbose=1; static int verbose=1;
...@@ -108,7 +108,7 @@ static void *start_0 (void *arg) { ...@@ -108,7 +108,7 @@ static void *start_0 (void *arg) {
long count = 0; long count = 0;
while (1) { while (1) {
uint64_t this_max_weight; uint64_t this_max_weight;
int r=queue_deq(q, &item, &weight, &this_max_weight); int r=toku_queue_deq(q, &item, &weight, &this_max_weight);
if (r==EOF) break; if (r==EOF) break;
assert(r==0); assert(r==0);
if (this_max_weight>d_max_weight) d_max_weight=this_max_weight; if (this_max_weight>d_max_weight) d_max_weight=this_max_weight;
...@@ -123,7 +123,7 @@ static void *start_0 (void *arg) { ...@@ -123,7 +123,7 @@ static void *start_0 (void *arg) {
static void enq (QUEUE q, long v, uint64_t weight) { static void enq (QUEUE q, long v, uint64_t weight) {
uint64_t this_max_weight; uint64_t this_max_weight;
int r = queue_enq(q, (void*)v, (weight==0)?0:1, &this_max_weight); int r = toku_queue_enq(q, (void*)v, (weight==0)?0:1, &this_max_weight);
assert(r==0); assert(r==0);
if (this_max_weight>e_max_weight) e_max_weight=this_max_weight; if (this_max_weight>e_max_weight) e_max_weight=this_max_weight;
//printf("E(%ld)=%ld %ld\n", v, this_max_weight, e_max_weight); //printf("E(%ld)=%ld %ld\n", v, this_max_weight, e_max_weight);
...@@ -138,7 +138,7 @@ static void queue_test_0 (uint64_t weight) ...@@ -138,7 +138,7 @@ static void queue_test_0 (uint64_t weight)
d_max_weight = 0; d_max_weight = 0;
QUEUE q; QUEUE q;
int r; int r;
r = queue_create(&q, weight); assert(r==0); r = toku_queue_create(&q, weight); assert(r==0);
toku_pthread_t thread; toku_pthread_t thread;
r = toku_pthread_create(&thread, NULL, start_0, q); assert(r==0); r = toku_pthread_create(&thread, NULL, start_0, q); assert(r==0);
enq(q, 0L, weight); enq(q, 0L, weight);
...@@ -148,12 +148,12 @@ static void queue_test_0 (uint64_t weight) ...@@ -148,12 +148,12 @@ static void queue_test_0 (uint64_t weight)
sleep(1); sleep(1);
enq(q, 4L, weight); enq(q, 4L, weight);
enq(q, 5L, weight); enq(q, 5L, weight);
r = queue_eof(q); assert(r==0); r = toku_queue_eof(q); assert(r==0);
void *result; void *result;
r = toku_pthread_join(thread, &result); assert(r==0); r = toku_pthread_join(thread, &result); assert(r==0);
assert(result==NULL); assert(result==NULL);
assert(count_0==6); assert(count_0==6);
r = queue_destroy(q); r = toku_queue_destroy(q);
assert(d_max_weight <= weight); assert(d_max_weight <= weight);
assert(e_max_weight <= weight); assert(e_max_weight <= weight);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment