Commit f1a13a74 authored by Yoni Fogel's avatar Yoni Fogel

refs #5663 Merge #5663 onto main

git-svn-id: file:///svn/toku/tokudb@51238 c7de825b-a66e-492c-adef-691d508d4ae1
parent 755ee90a
......@@ -11,6 +11,9 @@
#include <unistd.h>
#include "memory.h"
#include <string.h>
#include "ftloader-internal.h"
#include "ft-internal.h"
#include "ft.h"
struct dbufio_file {
// i/o thread owns these
......@@ -18,7 +21,7 @@ struct dbufio_file {
// consumers own these
size_t offset_in_buf;
toku_off_t offset_in_file;
toku_off_t offset_in_uncompressed_file;
// need the mutex to modify these
struct dbufio_file *next;
......@@ -49,6 +52,7 @@ struct dbufio_fileset {
size_t bufsize; // the bufsize is the constant (the same for all buffers).
bool panic;
bool compressed;
int panic_errno;
toku_pthread_t iothread;
};
......@@ -75,6 +79,162 @@ static bool paniced (DBUFIO_FILESET bfs) {
return bfs->panic;
}
static ssize_t dbf_read_some_compressed(struct dbufio_file *dbf, char *buf, size_t bufsize) {
ssize_t ret;
invariant(bufsize >= MAX_UNCOMPRESSED_BUF);
unsigned char *raw_block = NULL;
// deserialize the sub block header
// total_size
// num_sub_blocks
// compressed_size,uncompressed_size,xsum (repeated num_sub_blocks times)
ssize_t readcode;
const uint32_t header_size = sizeof(uint32_t);
char header[header_size];
readcode = toku_os_read(dbf->fd, &header, header_size);
if (readcode < 0) {
ret = -1;
goto exit;
}
if (readcode == 0) {
ret = 0;
goto exit;
}
if (readcode < header_size) {
errno = TOKUDB_NO_DATA;
ret = -1;
goto exit;
}
uint32_t total_size;
{
uint32_t *p = (uint32_t *) &header[0];
total_size = toku_dtoh32(p[0]);
}
if (total_size == 0 || total_size > (1<<30)) {
errno = toku_db_badformat();
ret = -1;
goto exit;
}
//Cannot use XMALLOC
MALLOC_N(total_size, raw_block);
if (raw_block == nullptr) {
errno = ENOMEM;
ret = -1;
goto exit;
}
readcode = toku_os_read(dbf->fd, raw_block, total_size);
if (readcode < 0) {
ret = -1;
goto exit;
}
if (readcode < total_size) {
errno = TOKUDB_NO_DATA;
ret = -1;
goto exit;
}
struct sub_block sub_block[max_sub_blocks];
uint32_t *sub_block_header;
sub_block_header = (uint32_t *) &raw_block[0];
int32_t n_sub_blocks;
n_sub_blocks = toku_dtoh32(sub_block_header[0]);
sub_block_header++;
size_t size_subblock_header;
size_subblock_header = sub_block_header_size(n_sub_blocks);
if (n_sub_blocks == 0 || n_sub_blocks > max_sub_blocks || size_subblock_header > total_size) {
errno = toku_db_badformat();
ret = -1;
goto exit;
}
for (int i = 0; i < n_sub_blocks; i++) {
sub_block_init(&sub_block[i]);
sub_block[i].compressed_size = toku_dtoh32(sub_block_header[0]);
sub_block[i].uncompressed_size = toku_dtoh32(sub_block_header[1]);
sub_block[i].xsum = toku_dtoh32(sub_block_header[2]);
sub_block_header += 3;
}
// verify sub block sizes
size_t total_compressed_size;
total_compressed_size = 0;
for (int i = 0; i < n_sub_blocks; i++) {
uint32_t compressed_size = sub_block[i].compressed_size;
if (compressed_size<=0 || compressed_size>(1<<30)) {
errno = toku_db_badformat();
ret = -1;
goto exit;
}
uint32_t uncompressed_size = sub_block[i].uncompressed_size;
if (uncompressed_size<=0 || uncompressed_size>(1<<30)) {
errno = toku_db_badformat();
ret = -1;
goto exit;
}
total_compressed_size += compressed_size;
}
if (total_size != total_compressed_size + size_subblock_header) {
errno = toku_db_badformat();
ret = -1;
goto exit;
}
// sum up the uncompressed size of the sub blocks
size_t uncompressed_size;
uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block);
if (uncompressed_size > bufsize || uncompressed_size > MAX_UNCOMPRESSED_BUF) {
errno = toku_db_badformat();
ret = -1;
goto exit;
}
unsigned char *uncompressed_data;
uncompressed_data = (unsigned char *)buf;
// point at the start of the compressed data (past the node header, the sub block header, and the header checksum)
unsigned char *compressed_data;
compressed_data = raw_block + size_subblock_header;
// decompress all the compressed sub blocks into the uncompressed buffer
{
int r;
r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, get_num_cores(), get_ft_pool());
if (r != 0) {
fprintf(stderr, "%s:%d loader failed %d at %p size %" PRIu32"\n", __FUNCTION__, __LINE__, r, raw_block, total_size);
dump_bad_block(raw_block, total_size);
errno = r;
ret = -1;
goto exit;
}
}
ret = uncompressed_size;
exit:
if (raw_block) {
toku_free(raw_block);
}
return ret;
}
static ssize_t dbf_read_compressed(struct dbufio_file *dbf, char *buf, size_t bufsize) {
invariant(bufsize >= MAX_UNCOMPRESSED_BUF);
size_t count = 0;
while (count + MAX_UNCOMPRESSED_BUF <= bufsize) {
ssize_t readcode = dbf_read_some_compressed(dbf, buf + count, bufsize - count);
if (readcode < 0) {
return readcode;
}
count += readcode;
if (readcode == 0) {
break;
}
}
return count;
}
static void* io_thread (void *v)
// The dbuf_thread does all the asynchronous I/O.
{
......@@ -118,7 +278,13 @@ static void* io_thread (void *v)
toku_mutex_unlock(&bfs->mutex);
//printf("%s:%d Doing read fd=%d\n", __FILE__, __LINE__, dbf->fd);
{
ssize_t readcode = toku_os_read(dbf->fd, dbf->buf[1], bfs->bufsize);
ssize_t readcode;
if (bfs->compressed) {
readcode = dbf_read_compressed(dbf, dbf->buf[1], bfs->bufsize);
}
else {
readcode = toku_os_read(dbf->fd, dbf->buf[1], bfs->bufsize);
}
//printf("%s:%d readcode=%ld\n", __FILE__, __LINE__, readcode);
if (readcode==-1) {
// a real error. Save the real error.
......@@ -159,11 +325,14 @@ static void* io_thread (void *v)
}
}
int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t bufsize) {
int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t bufsize, bool compressed) {
//printf("%s:%d here\n", __FILE__, __LINE__);
int result = 0;
DBUFIO_FILESET CALLOC(bfs);
if (bfs==0) { result = get_error_errno(); }
bfs->compressed = compressed;
bool mutex_inited = false, cond_inited = false;
if (result==0) {
CALLOC_N(N, bfs->files);
......@@ -190,7 +359,7 @@ int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t b
for (int i=0; i<N; i++) {
bfs->files[i].fd = fds[i];
bfs->files[i].offset_in_buf = 0;
bfs->files[i].offset_in_file = 0;
bfs->files[i].offset_in_uncompressed_file = 0;
bfs->files[i].next = NULL;
bfs->files[i].second_buf_ready = false;
for (int j=0; j<2; j++) {
......@@ -202,12 +371,18 @@ int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t b
bfs->files[i].error_code[j] = 0;
}
bfs->files[i].io_done = false;
{
ssize_t r = toku_os_read(bfs->files[i].fd, bfs->files[i].buf[0], bufsize);
ssize_t r;
if (bfs->compressed) {
r = dbf_read_compressed(&bfs->files[i], bfs->files[i].buf[0], bufsize);
} else {
//TODO: 5663 If compressed need to read differently
r = toku_os_read(bfs->files[i].fd, bfs->files[i].buf[0], bufsize);
}
{
if (r<0) {
result=get_error_errno();
break;
} else if (r==0) {
} else if (r==0) {
// it's EOF
bfs->files[i].io_done = true;
bfs->n_not_done--;
......@@ -297,7 +472,7 @@ int dbufio_fileset_read (DBUFIO_FILESET bfs, int filenum, void *buf_v, size_t co
// Enough data is present to do it all now
memcpy(buf, dbf->buf[0]+dbf->offset_in_buf, count);
dbf->offset_in_buf += count;
dbf->offset_in_file += count;
dbf->offset_in_uncompressed_file += count;
*n_read = count;
return 0;
} else if (dbf->n_in_buf[0] > dbf->offset_in_buf) {
......@@ -306,7 +481,7 @@ int dbufio_fileset_read (DBUFIO_FILESET bfs, int filenum, void *buf_v, size_t co
assert(dbf->offset_in_buf + this_count <= bfs->bufsize);
memcpy(buf, dbf->buf[0]+dbf->offset_in_buf, this_count);
dbf->offset_in_buf += this_count;
dbf->offset_in_file += this_count;
dbf->offset_in_uncompressed_file += this_count;
size_t sub_n_read;
int r = dbufio_fileset_read(bfs, filenum, buf+this_count, count-this_count, &sub_n_read);
if (r==0) {
......
......@@ -14,7 +14,7 @@
/* An implementation would typically use a separate thread or asynchronous I/O to fetch ahead data for each file. The system will typically fill two buffers of size M for each file. One buffer is being read out of using dbuf_read(), and the other buffer is either empty (waiting on the asynchronous I/O to start), being filled in by the asynchronous I/O mechanism, or is waiting for the caller to read data from it. */
typedef struct dbufio_fileset *DBUFIO_FILESET;
int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t bufsize);
int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t bufsize, bool compressed);
int destroy_dbufio_fileset(DBUFIO_FILESET);
......
......@@ -110,4 +110,7 @@ void toku_ft_set_blackhole(FT_HANDLE ft_handle);
// The difference between the two is MVCC garbage.
void toku_ft_get_garbage(FT ft, uint64_t *total_space, uint64_t *used_space);
int get_num_cores(void);
struct toku_thread_pool *get_ft_pool(void);
void dump_bad_block(unsigned char *vp, uint64_t size);
#endif
......@@ -10,6 +10,7 @@
#include <portability/toku_atomic.h>
#include <util/sort.h>
#include <util/threadpool.h>
#include "ft.h"
static FT_UPGRADE_STATUS_S ft_upgrade_status;
......@@ -57,6 +58,14 @@ static inline void do_toku_trace(const char *cp, int len) {
static int num_cores = 0; // cache the number of cores for the parallelization
static struct toku_thread_pool *ft_pool = NULL;
int get_num_cores(void) {
return num_cores;
}
struct toku_thread_pool *get_ft_pool(void) {
return ft_pool;
}
void
toku_ft_serialize_layer_init(void) {
num_cores = toku_os_get_number_active_processors();
......@@ -980,7 +989,7 @@ deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
// dump a buffer to stderr
// no locking around this for now
static void
void
dump_bad_block(unsigned char *vp, uint64_t size) {
const uint64_t linesize = 64;
uint64_t n = size / linesize;
......@@ -1181,14 +1190,7 @@ read_and_decompress_sub_block(struct rbuf *rb, struct sub_block *sb)
goto exit;
}
sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size);
toku_decompress(
(Bytef *) sb->uncompressed_ptr,
sb->uncompressed_size,
(Bytef *) sb->compressed_ptr,
sb->compressed_size
);
just_decompress_sub_block(sb);
exit:
return r;
}
......
......@@ -12,6 +12,19 @@
#include <toku_pthread.h>
#include "dbufio.h"
enum { EXTRACTOR_QUEUE_DEPTH = 2,
FILE_BUFFER_SIZE = 1<<24,
MIN_ROWSET_MEMORY = 1<<23,
MIN_MERGE_FANIN = 2,
FRACTAL_WRITER_QUEUE_DEPTH = 3,
FRACTAL_WRITER_ROWSETS = FRACTAL_WRITER_QUEUE_DEPTH + 2,
DBUFIO_DEPTH = 2,
TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big.
MIN_MERGE_BUF_SIZE = 1<<20, // always use at least this much
MAX_UNCOMPRESSED_BUF = MIN_MERGE_BUF_SIZE
};
/* These functions are exported to allow the tests to compile. */
/* These structures maintain a collection of all the open temporary files used by the loader. */
......@@ -56,7 +69,7 @@ int init_rowset (struct rowset *rows, uint64_t memory_budget);
void destroy_rowset (struct rowset *rows);
int add_row (struct rowset *rows, DBT *key, DBT *val);
int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, uint64_t *dataoff, FTLOADER bl);
int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, uint64_t *dataoff, struct wbuf *wb, FTLOADER bl);
int loader_read_row (FILE *f, DBT *key, DBT *val);
struct merge_fileset {
......@@ -146,6 +159,7 @@ struct ft_loader_s {
CACHETABLE cachetable;
bool did_reserve_memory;
bool compress_intermediates;
uint64_t reserved_memory; // how much memory are we allowed to use?
/* To make it easier to recover from errors, we don't use FILE*, instead we use an index into the file_infos. */
......@@ -243,7 +257,8 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
const char *temp_file_template,
LSN load_lsn,
TOKUTXN txn,
bool reserve_memory);
bool reserve_memory,
bool compress_intermediates);
void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error);
......
......@@ -47,18 +47,6 @@ static uint32_t size_factor = 1024;
static uint32_t default_loader_nodesize = FT_DEFAULT_NODE_SIZE;
static uint32_t default_loader_basementnodesize = FT_DEFAULT_BASEMENT_NODE_SIZE;
enum { EXTRACTOR_QUEUE_DEPTH = 2,
FILE_BUFFER_SIZE = 1<<24,
MIN_ROWSET_MEMORY = 1<<23,
MIN_MERGE_FANIN = 2,
FRACTAL_WRITER_QUEUE_DEPTH = 3,
FRACTAL_WRITER_ROWSETS = FRACTAL_WRITER_QUEUE_DEPTH + 2,
DBUFIO_DEPTH = 2,
TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big.
MIN_MERGE_BUF_SIZE = 1<<20 // always use at least this much
};
void
toku_ft_loader_set_size_factor(uint32_t factor) {
// For test purposes only
......@@ -468,7 +456,8 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
const char *temp_file_template,
LSN load_lsn,
TOKUTXN txn,
bool reserve_memory)
bool reserve_memory,
bool compress_intermediates)
// Effect: Allocate and initialize a FTLOADER, but do not create the extractor thread.
{
FTLOADER CALLOC(bl); // initialized to all zeros (hence CALLOC)
......@@ -484,6 +473,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
bl->did_reserve_memory = false;
bl->reserved_memory = 512*1024*1024; // if no cache table use 512MB.
}
bl->compress_intermediates = compress_intermediates;
//printf("Reserved memory=%ld\n", bl->reserved_memory);
bl->src_db = src_db;
......@@ -570,7 +560,8 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp,
const char *temp_file_template,
LSN load_lsn,
TOKUTXN txn,
bool reserve_memory)
bool reserve_memory,
bool compress_intermediates)
/* Effect: called by DB_ENV->create_loader to create a brt loader.
* Arguments:
* blp Return the brt loader here.
......@@ -592,7 +583,8 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp,
temp_file_template,
load_lsn,
txn,
reserve_memory);
reserve_memory,
compress_intermediates);
if (r!=0) result = r;
}
if (result==0) {
......@@ -608,8 +600,12 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp,
return result;
}
static void ft_loader_set_panic(FTLOADER bl, int error, bool callback) {
int r = ft_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
static void ft_loader_set_panic(FTLOADER bl, int error, bool callback, int which_db, DBT *key, DBT *val) {
DB *db = nullptr;
if (bl && bl->dbs && which_db >= 0 && which_db < bl->N) {
db = bl->dbs[which_db];
}
int r = ft_loader_set_error(&bl->error_callback, error, db, which_db, key, val);
if (r == 0 && callback)
ft_loader_call_error_function(&bl->error_callback);
}
......@@ -624,26 +620,134 @@ FILE *toku_bl_fidx2file (FTLOADER bl, FIDX i) {
return result;
}
static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, FTLOADER UU(bl))
static int bl_finish_compressed_write(FILE *stream, struct wbuf *wb) {
int r;
char *compressed_buf = NULL;
const size_t data_size = wb->ndone;
invariant(data_size > 0);
invariant(data_size <= MAX_UNCOMPRESSED_BUF);
int n_sub_blocks = 0;
int sub_block_size = 0;
r = choose_sub_block_size(wb->ndone, max_sub_blocks, &sub_block_size, &n_sub_blocks);
invariant(r==0);
invariant(0 < n_sub_blocks && n_sub_blocks <= max_sub_blocks);
invariant(sub_block_size > 0);
struct sub_block sub_block[max_sub_blocks];
// set the initial sub block size for all of the sub blocks
for (int i = 0; i < n_sub_blocks; i++) {
sub_block_init(&sub_block[i]);
}
set_all_sub_block_sizes(data_size, sub_block_size, n_sub_blocks, sub_block);
size_t compressed_len = get_sum_compressed_size_bound(n_sub_blocks, sub_block, TOKU_DEFAULT_COMPRESSION_METHOD);
const size_t sub_block_header_len = sub_block_header_size(n_sub_blocks);
const size_t other_overhead = sizeof(uint32_t); //total_size
const size_t header_len = sub_block_header_len + other_overhead;
MALLOC_N(header_len + compressed_len, compressed_buf);
if (compressed_buf == nullptr) {
return ENOMEM;
}
// compress all of the sub blocks
char *uncompressed_ptr = (char*)wb->buf;
char *compressed_ptr = compressed_buf + header_len;
compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr,
get_num_cores(), get_ft_pool(), TOKU_DEFAULT_COMPRESSION_METHOD);
//total_size does NOT include itself
uint32_t total_size = compressed_len + sub_block_header_len;
// serialize the sub block header
uint32_t *ptr = (uint32_t *)(compressed_buf);
*ptr++ = toku_htod32(total_size);
*ptr++ = toku_htod32(n_sub_blocks);
for (int i=0; i<n_sub_blocks; i++) {
ptr[0] = toku_htod32(sub_block[i].compressed_size);
ptr[1] = toku_htod32(sub_block[i].uncompressed_size);
ptr[2] = toku_htod32(sub_block[i].xsum);
ptr += 3;
}
// Mark as written
wb->ndone = 0;
size_t size_to_write = total_size + 4; // Includes writing total_size
{
size_t written = do_fwrite(compressed_buf, 1, size_to_write, stream);
if (written!=size_to_write) {
if (os_fwrite_fun) // if using hook to induce artificial errors (for testing) ...
r = get_maybe_error_errno(); // ... then there is no error in the stream, but there is one in errno
else
r = ferror(stream);
invariant(r!=0);
goto exit;
}
}
r = 0;
exit:
if (compressed_buf) {
toku_free(compressed_buf);
}
return r;
}
static int bl_compressed_write(void *ptr, size_t nbytes, FILE *stream, struct wbuf *wb) {
invariant(wb->size <= MAX_UNCOMPRESSED_BUF);
size_t bytes_left = nbytes;
char *buf = (char*)ptr;
while (bytes_left > 0) {
size_t bytes_to_copy = bytes_left;
if (wb->ndone + bytes_to_copy > wb->size) {
bytes_to_copy = wb->size - wb->ndone;
}
wbuf_nocrc_literal_bytes(wb, buf, bytes_to_copy);
if (wb->ndone == wb->size) {
//Compress, write to disk, and empty out wb
int r = bl_finish_compressed_write(stream, wb);
if (r != 0) {
errno = r;
return -1;
}
wb->ndone = 0;
}
bytes_left -= bytes_to_copy;
buf += bytes_to_copy;
}
return 0;
}
static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, struct wbuf *wb, FTLOADER bl)
/* Effect: this is a wrapper for fwrite that returns 0 on success, otherwise returns an error number.
* Arguments:
* ptr the data to be writen.
* size the amount of data to be written.
* nmemb the number of units of size to be written.
* stream write the data here.
* wb where to write uncompressed data (if we're compressing) or ignore if NULL
* bl passed so we can panic the ft_loader if something goes wrong (recording the error number).
* Return value: 0 on success, an error number otherwise.
*/
{
size_t r = do_fwrite(ptr, size, nmemb, stream);
if (r!=nmemb) {
int e;
if (os_fwrite_fun) // if using hook to induce artificial errors (for testing) ...
e = get_maybe_error_errno(); // ... then there is no error in the stream, but there is one in errno
else
e = ferror(stream);
invariant(e!=0);
return e;
if (!bl->compress_intermediates || !wb) {
size_t r = do_fwrite(ptr, size, nmemb, stream);
if (r!=nmemb) {
int e;
if (os_fwrite_fun) // if using hook to induce artificial errors (for testing) ...
e = get_maybe_error_errno(); // ... then there is no error in the stream, but there is one in errno
else
e = ferror(stream);
invariant(e!=0);
return e;
}
} else {
size_t num_bytes = size * nmemb;
int r = bl_compressed_write(ptr, num_bytes, stream, wb);
if (r != 0) {
return r;
}
}
return 0;
}
......@@ -674,12 +778,12 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream)
}
}
static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, FTLOADER bl)
static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, struct wbuf *wb, FTLOADER bl)
{
int r;
int dlen = dbt->size;
if ((r=bl_fwrite(&dlen, sizeof(dlen), 1, datafile, bl))) return r;
if ((r=bl_fwrite(dbt->data, 1, dlen, datafile, bl))) return r;
if ((r=bl_fwrite(&dlen, sizeof(dlen), 1, datafile, wb, bl))) return r;
if ((r=bl_fwrite(dbt->data, 1, dlen, datafile, wb, bl))) return r;
if (dataoff)
*dataoff += dlen + sizeof(dlen);
return 0;
......@@ -741,12 +845,13 @@ static int bl_read_dbt_from_dbufio (/*in*/DBT *dbt, DBUFIO_FILESET bfs, int file
}
int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, uint64_t *dataoff, FTLOADER bl)
int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, uint64_t *dataoff, struct wbuf *wb, FTLOADER bl)
/* Effect: Given a key and a val (both DBTs), write them to a file. Increment *dataoff so that it's up to date.
* Arguments:
* key, val write these.
* data the file to write them to
* dataoff a pointer to a counter that keeps track of the amount of data written so far.
* wb a pointer (possibly NULL) to buffer uncompressed output
* bl the ft_loader (passed so we can panic if needed).
* Return value: 0 on success, an error number otherwise.
*/
......@@ -755,8 +860,8 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, uint64_t *datao
//int vlen = val->size;
int r;
// we have a chance to handle the errors because when we close we can delete all the files.
if ((r=bl_write_dbt(key, dataf, dataoff, bl))) return r;
if ((r=bl_write_dbt(val, dataf, dataoff, bl))) return r;
if ((r=bl_write_dbt(key, dataf, dataoff, wb, bl))) return r;
if ((r=bl_write_dbt(val, dataf, dataoff, wb, bl))) return r;
toku_mutex_lock(&bl->file_infos.lock);
bl->file_infos.file_infos[data.idx].n_rows++;
toku_mutex_unlock(&bl->file_infos.lock);
......@@ -948,7 +1053,7 @@ static void* extractor_thread (void *blv) {
{
r = process_primary_rows(bl, primary_rowset);
if (r)
ft_loader_set_panic(bl, r, false);
ft_loader_set_panic(bl, r, false, 0, nullptr, nullptr);
}
}
......@@ -956,7 +1061,7 @@ static void* extractor_thread (void *blv) {
if (r == 0) {
r = finish_primary_rows(bl);
if (r)
ft_loader_set_panic(bl, r, false);
ft_loader_set_panic(bl, r, false, 0, nullptr, nullptr);
}
return NULL;
......@@ -1086,31 +1191,41 @@ static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_ro
pkey.size = prow->klen;
pval.data = primary_rowset->data + prow->off + prow->klen;
pval.size = prow->vlen;
DBT *dest_key = &skey;
DBT *dest_val = &sval;
{
int r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, &skey, &sval, &pkey, &pval);
if (r != 0) {
error_codes[i] = r;
inc_error_count();
break;
int r;
if (bl->dbs[i] != bl->src_db) {
r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, dest_key, dest_val, &pkey, &pval);
if (r != 0) {
error_codes[i] = r;
inc_error_count();
break;
}
} else {
dest_key = &pkey;
dest_val = &pval;
}
if (skey.size > klimit) {
if (dest_key->size > klimit) {
error_codes[i] = EINVAL;
fprintf(stderr, "Key too big (keysize=%d bytes, limit=%d bytes)\n", skey.size, klimit);
fprintf(stderr, "Key too big (keysize=%d bytes, limit=%d bytes)\n", dest_key->size, klimit);
inc_error_count();
break;
}
if (sval.size > vlimit) {
if (dest_val->size > vlimit) {
error_codes[i] = EINVAL;
fprintf(stderr, "Row too big (rowsize=%d bytes, limit=%d bytes)\n", sval.size, vlimit);
fprintf(stderr, "Row too big (rowsize=%d bytes, limit=%d bytes)\n", dest_val->size, vlimit);
inc_error_count();
break;
}
}
bl->extracted_datasizes[i] += ft_loader_leafentry_size(skey.size, sval.size, leafentry_xid(bl, i));
bl->extracted_datasizes[i] += ft_loader_leafentry_size(dest_key->size, dest_val->size, leafentry_xid(bl, i));
if (row_wont_fit(rows, skey.size + sval.size)) {
if (row_wont_fit(rows, dest_key->size + dest_val->size)) {
//printf("%s:%d rows.n_rows=%ld rows.n_bytes=%ld\n", __FILE__, __LINE__, rows->n_rows, rows->n_bytes);
int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
// If we do spawn this, then we must account for the additional storage in the memory_per_rowset() function.
......@@ -1121,7 +1236,7 @@ static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_ro
break;
}
}
int r = add_row(rows, &skey, &sval);
int r = add_row(rows, dest_key, dest_val);
if (r != 0) {
error_codes[i] = r;
inc_error_count();
......@@ -1142,7 +1257,7 @@ static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_ro
}
}
{
if (bl->dbs[i] != bl->src_db) {
if (skey.flags) {
toku_free(skey.data); skey.data = NULL;
}
......@@ -1157,7 +1272,10 @@ static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_ro
int r = 0;
if (error_count > 0) {
for (int i=0; i<bl->N; i++) {
if (error_codes[i]) r = error_codes[i];
if (error_codes[i]) {
r = error_codes[i];
ft_loader_set_panic(bl, r, false, i, nullptr, nullptr);
}
}
invariant(r); // found the error
}
......@@ -1457,16 +1575,42 @@ static int update_progress (int N,
static int write_rowset_to_file (FTLOADER bl, FIDX sfile, const struct rowset rows) {
int r = 0;
// Allocate a buffer if we're compressing intermediates.
char *uncompressed_buffer = nullptr;
if (bl->compress_intermediates) {
MALLOC_N(MAX_UNCOMPRESSED_BUF, uncompressed_buffer);
if (uncompressed_buffer == nullptr) {
return ENOMEM;
}
}
struct wbuf wb;
wbuf_init(&wb, uncompressed_buffer, MAX_UNCOMPRESSED_BUF);
FILE *sstream = toku_bl_fidx2file(bl, sfile);
for (size_t i=0; i<rows.n_rows; i++) {
DBT skey = make_dbt(rows.data + rows.rows[i].off, rows.rows[i].klen);
DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen, rows.rows[i].vlen);
uint64_t soffset=0; // don't really need this.
int r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, bl);
if (r != 0) return r;
r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, &wb, bl);
if (r != 0) {
goto exit;
}
}
return 0;
if (bl->compress_intermediates && wb.ndone > 0) {
r = bl_finish_compressed_write(sstream, &wb);
if (r != 0) {
goto exit;
}
}
r = 0;
exit:
if (uncompressed_buffer) {
toku_free(uncompressed_buffer);
}
return r;
}
......@@ -1621,6 +1765,17 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q
int r = init_rowset(output_rowset, memory_per_rowset_during_merge(bl, n_sources, to_q));
if (r!=0) result = r;
}
// Allocate a buffer if we're compressing intermediates.
char *uncompressed_buffer = nullptr;
struct wbuf wb;
if (bl->compress_intermediates && !to_q) {
MALLOC_N(MAX_UNCOMPRESSED_BUF, uncompressed_buffer);
if (uncompressed_buffer == nullptr) {
result = ENOMEM;
}
}
wbuf_init(&wb, uncompressed_buffer, MAX_UNCOMPRESSED_BUF);
//printf(" n_rows=%ld\n", n_rows);
while (result==0 && pqueue_size(pq)>0) {
......@@ -1663,7 +1818,7 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q
}
} else {
// write it to the dest file
int r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], bl);
int r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], &wb, bl);
if (r!=0) {
result = r;
break;
......@@ -1710,6 +1865,10 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q
if (0) printf("%s:%d Progress=%d\n", __FILE__, __LINE__, r);
}
}
if (result == 0 && uncompressed_buffer != nullptr && wb.ndone > 0) {
result = bl_finish_compressed_write(dest_stream, &wb);
}
if (result==0 && to_q) {
int r = queue_enq(q, (void*)output_rowset, 1, NULL);
if (r!=0)
......@@ -1719,6 +1878,9 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q
}
// cleanup
if (uncompressed_buffer) {
toku_free(uncompressed_buffer);
}
for (int i=0; i<n_sources; i++) {
toku_free(keys[i].data); keys[i].data = NULL;
toku_free(vals[i].data); vals[i].data = NULL;
......@@ -1754,7 +1916,8 @@ static int merge_some_files (const bool to_q, FIDX dest_data, QUEUE q, int n_sou
}
}
if (result==0) {
int r = create_dbufio_fileset(&bfs, n_sources, fds, memory_per_rowset_during_merge(bl, n_sources, to_q));
int r = create_dbufio_fileset(&bfs, n_sources, fds,
memory_per_rowset_during_merge(bl, n_sources, to_q), bl->compress_intermediates);
if (r!=0) { result = r; }
}
......@@ -1895,7 +2058,7 @@ int merge_files (struct merge_fileset *fs,
if (result!=0) break;
}
if (result) ft_loader_set_panic(bl, result, true);
if (result) ft_loader_set_panic(bl, result, true, which_db, nullptr, nullptr);
{
int r = queue_eof(output_q);
......@@ -2249,7 +2412,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
int rr = queue_deq(q, &item, NULL, NULL);
if (rr == EOF) break;
if (rr != 0) {
ft_loader_set_panic(bl, rr, true);
ft_loader_set_panic(bl, rr, true, which_db, nullptr, nullptr);
break;
}
}
......@@ -2283,8 +2446,8 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
n_pivots++;
invariant(maxkey.data != NULL);
if ((r = bl_write_dbt(&maxkey, pivots_stream, NULL, bl))) {
ft_loader_set_panic(bl, r, true);
if ((r = bl_write_dbt(&maxkey, pivots_stream, NULL, nullptr, bl))) {
ft_loader_set_panic(bl, r, true, which_db, nullptr, nullptr);
if (result == 0) result = r;
break;
}
......@@ -2294,7 +2457,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
r = allocate_block(&out, &lblock);
if (r != 0) {
ft_loader_set_panic(bl, r, true);
ft_loader_set_panic(bl, r, true, which_db, nullptr, nullptr);
if (result == 0) result = r;
break;
}
......@@ -2346,7 +2509,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
{
DBT key = make_dbt(0,0); // must write an extra DBT into the pivots file.
r = bl_write_dbt(&key, pivots_stream, NULL, bl);
r = bl_write_dbt(&key, pivots_stream, NULL, nullptr, bl);
if (r) {
result = r; goto error;
}
......@@ -2723,7 +2886,7 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
result = update_progress(progress_allocation, bl, "wrote node");
if (result)
ft_loader_set_panic(bl, result, true);
ft_loader_set_panic(bl, result, true, 0, nullptr, nullptr);
}
static int write_translation_table (struct dbout *out, long long *off_of_translation_p) {
......@@ -2833,7 +2996,7 @@ static int setup_nonleaf_block (int n_children,
if (result == 0) {
FILE *next_pivots_stream = toku_bl_fidx2file(bl, next_pivots_file);
int r = bl_write_dbt(&pivots[n_children-1], next_pivots_stream, NULL, bl);
int r = bl_write_dbt(&pivots[n_children-1], next_pivots_stream, NULL, nullptr, bl);
if (r)
result = r;
}
......@@ -2933,7 +3096,7 @@ static void write_nonleaf_node (FTLOADER bl, struct dbout *out, int64_t blocknum
toku_free(subtree_info);
if (result != 0)
ft_loader_set_panic(bl, result, true);
ft_loader_set_panic(bl, result, true, 0, nullptr, nullptr);
}
static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor, uint32_t target_nodesize, uint32_t target_basementnodesize, enum toku_compression_method target_compression_method) {
......
......@@ -26,7 +26,8 @@ int toku_ft_loader_open (FTLOADER *bl,
const char *temp_file_template,
LSN load_lsn,
TOKUTXN txn,
bool reserve_memory);
bool reserve_memory,
bool compress_intermediates);
int toku_ft_loader_put (FTLOADER bl, DBT *key, DBT *val);
......
......@@ -44,7 +44,7 @@ sub_block_header_size(int n_sub_blocks);
void
set_compressed_size_bound(struct sub_block *se, enum toku_compression_method method);
// get the sum of the sub block compressed sizes
// get the sum of the sub block compressed bound sizes
size_t
get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[], enum toku_compression_method method);
......
......@@ -42,7 +42,7 @@ static void test1 (size_t chars_per_file, size_t UU(bytes_per_read)) {
}
DBUFIO_FILESET bfs;
{
int r = create_dbufio_fileset(&bfs, N, fds, M);
int r = create_dbufio_fileset(&bfs, N, fds, M, false);
assert(r==0);
}
......
......@@ -41,7 +41,7 @@ static void test1 (size_t chars_per_file, size_t bytes_per_read) {
}
DBUFIO_FILESET bfs;
{
int r = create_dbufio_fileset(&bfs, N, fds, M);
int r = create_dbufio_fileset(&bfs, N, fds, M, false);
assert(r==0);
}
while (n_live>0) {
......
......@@ -87,7 +87,7 @@ static void test_extractor(int nrows, int nrowsets, bool expect_fail) {
}
FTLOADER loader;
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, true);
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, true, false);
assert(r == 0);
struct rowset *rowset[nrowsets];
......
......@@ -99,7 +99,7 @@ static void test_extractor(int nrows, int nrowsets, bool expect_fail, const char
sprintf(temp, "%s/%s", testdir, "tempXXXXXX");
FTLOADER loader;
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, true);
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, true, false);
assert(r == 0);
struct rowset *rowset[nrowsets];
......
......@@ -319,7 +319,7 @@ static void test_extractor(int nrows, int nrowsets, const char *testdir) {
sprintf(temp, "%s/%s", testdir, "tempXXXXXX");
FTLOADER loader;
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, temp, ZERO_LSN, TXNID_NONE, true);
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, temp, ZERO_LSN, TXNID_NONE, true, false);
assert(r == 0);
struct rowset *rowset[nrowsets];
......
......@@ -326,7 +326,7 @@ static void test (const char *directory, bool is_error) {
bt_compare_functions,
"tempxxxxxx",
*lsnp,
TXNID_NONE, true);
TXNID_NONE, true, false);
assert(r==0);
}
......@@ -340,7 +340,7 @@ static void test (const char *directory, bool is_error) {
{ int r = queue_create(&q, 1000); assert(r==0); }
DBUFIO_FILESET bfs;
const int MERGE_BUF_SIZE = 100000; // bigger than 64K so that we will trigger malloc issues.
{ int r = create_dbufio_fileset(&bfs, N_SOURCES, fds, MERGE_BUF_SIZE); assert(r==0); }
{ int r = create_dbufio_fileset(&bfs, N_SOURCES, fds, MERGE_BUF_SIZE, false); assert(r==0); }
FIDX *XMALLOC_N(N_SOURCES, src_fidxs);
assert(bl->file_infos.n_files==0);
bl->file_infos.n_files = N_SOURCES;
......
......@@ -57,7 +57,7 @@ static void test_loader_open(int ndbs) {
for (i = 0; ; i++) {
set_my_malloc_trigger(i+1);
r = toku_ft_loader_open(&loader, NULL, NULL, NULL, ndbs, brts, dbs, fnames, compares, "", ZERO_LSN, TXNID_NONE, true);
r = toku_ft_loader_open(&loader, NULL, NULL, NULL, ndbs, brts, dbs, fnames, compares, "", ZERO_LSN, TXNID_NONE, true, false);
if (r == 0)
break;
}
......
......@@ -187,7 +187,7 @@ static void test_read_write_rows (char *tf_template) {
toku_fill_dbt(&key, keystrings[i], strlen(keystrings[i]));
DBT val;
toku_fill_dbt(&val, valstrings[i], strlen(valstrings[i]));
r = loader_write_row(&key, &val, file, toku_bl_fidx2file(&bl, file), &dataoff, &bl);
r = loader_write_row(&key, &val, file, toku_bl_fidx2file(&bl, file), &dataoff, nullptr, &bl);
CKERR(r);
actual_size+=key.size + val.size + 8;
}
......
......@@ -430,3 +430,8 @@ int toku_fsync_directory(const char *fname) {
toku_free(dirname);
return result;
}
FILE *toku_os_fmemopen(void *buf, size_t size, const char *mode) {
return fmemopen(buf, size, mode);
}
......@@ -174,6 +174,7 @@ toku_loader_create_loader(DB_ENV *env,
DB_LOADER *loader = NULL;
bool puts_allowed = !(loader_flags & LOADER_DISALLOW_PUTS);
bool compress_intermediates = (loader_flags & LOADER_COMPRESS_INTERMEDIATES) != 0;
XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func)
XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL)
......@@ -252,7 +253,8 @@ toku_loader_create_loader(DB_ENV *env,
loader->i->temp_file_template,
load_lsn,
ttxn,
puts_allowed);
puts_allowed,
compress_intermediates);
if ( rval!=0 ) {
toku_free(new_inames_in_env);
toku_free(brts);
......
......@@ -546,14 +546,22 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
add_test(ydb/loader-stress-test2.tdb loader-stress-test.tdb -r 5000 -s -e dir.loader-stress-test2.tdb)
add_test(ydb/loader-stress-test3.tdb loader-stress-test.tdb -u -c -e dir.loader-stress-test3.tdb)
add_test(ydb/loader-stress-test4.tdb loader-stress-test.tdb -r 10000000 -c -e dir.loader-stress-test4.tdb)
add_test(ydb/loader-stress-test5.tdb loader-stress-test.tdb -c -z -e dir.loader-stress-test5.tdb)
add_test(ydb/loader-stress-test0z.tdb loader-stress-test.tdb -c -e dir.loader-stress-test0z.tdb -z)
add_test(ydb/loader-stress-test1z.tdb loader-stress-test.tdb -c -p -e dir.loader-stress-test1z.tdb -z)
add_test(ydb/loader-stress-test2z.tdb loader-stress-test.tdb -r 5000 -s -e dir.loader-stress-test2z.tdb -z)
add_test(ydb/loader-stress-test3z.tdb loader-stress-test.tdb -u -c -e dir.loader-stress-test3z.tdb -z)
add_test(ydb/loader-stress-test4z.tdb loader-stress-test.tdb -r 10000000 -c -e dir.loader-stress-test4z.tdb -z)
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES
dir.loader-stress-test0.tdb
dir.loader-stress-test1.tdb
dir.loader-stress-test2.tdb
dir.loader-stress-test3.tdb
dir.loader-stress-test4.tdb
dir.loader-stress-test5.tdb
dir.loader-stress-test0z.tdb
dir.loader-stress-test1z.tdb
dir.loader-stress-test2z.tdb
dir.loader-stress-test3z.tdb
dir.loader-stress-test4z.tdb
)
list(REMOVE_ITEM loader_tests loader-dup-test.loader)
......@@ -563,6 +571,12 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
add_test(ydb/loader-dup-test3.tdb loader-dup-test.tdb -d 1 -s -r 100 -e dir.loader-dup-test3.tdb)
add_test(ydb/loader-dup-test4.tdb loader-dup-test.tdb -d 1 -s -r 1000 -e dir.loader-dup-test4.tdb)
add_test(ydb/loader-dup-test5.tdb loader-dup-test.tdb -d 1 -s -r 1000 -E -e dir.loader-dup-test5.tdb)
add_test(ydb/loader-dup-test0z.tdb loader-dup-test.tdb -e dir.loader-dup-test0z.tdb -z)
add_test(ydb/loader-dup-test1z.tdb loader-dup-test.tdb -d 1 -r 500000 -e dir.loader-dup-test1z.tdb -z)
add_test(ydb/loader-dup-test2z.tdb loader-dup-test.tdb -d 1 -r 1000000 -e dir.loader-dup-test2z.tdb -z)
add_test(ydb/loader-dup-test3z.tdb loader-dup-test.tdb -d 1 -s -r 100 -e dir.loader-dup-test3z.tdb -z)
add_test(ydb/loader-dup-test4z.tdb loader-dup-test.tdb -d 1 -s -r 1000 -e dir.loader-dup-test4z.tdb -z)
add_test(ydb/loader-dup-test5z.tdb loader-dup-test.tdb -d 1 -s -r 1000 -E -e dir.loader-dup-test5z.tdb -z)
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES
dir.loader-dup-test0.tdb
dir.loader-dup-test1.tdb
......@@ -570,6 +584,12 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
dir.loader-dup-test3.tdb
dir.loader-dup-test4.tdb
dir.loader-dup-test5.tdb
dir.loader-dup-test0z.tdb
dir.loader-dup-test1z.tdb
dir.loader-dup-test2z.tdb
dir.loader-dup-test3z.tdb
dir.loader-dup-test4z.tdb
dir.loader-dup-test5z.tdb
)
## as part of #4503, we took out test 1 and 3
......@@ -578,6 +598,8 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
#add_test(ydb/loader-cleanup-test1.tdb loader-cleanup-test.tdb -s -r 800 -p -e dir.loader-cleanup-test1.tdb)
add_test(ydb/loader-cleanup-test2.tdb loader-cleanup-test.tdb -s -r 8000 -e dir.loader-cleanup-test2.tdb)
#add_test(ydb/loader-cleanup-test3.tdb loader-cleanup-test.tdb -s -r 8000 -p -e dir.loader-cleanup-test3.tdb)
add_test(ydb/loader-cleanup-test0z.tdb loader-cleanup-test.tdb -s -r 800 -e dir.loader-cleanup-test0z.tdb -z)
add_test(ydb/loader-cleanup-test2z.tdb loader-cleanup-test.tdb -s -r 8000 -e dir.loader-cleanup-test2z.tdb -z)
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES
dir.loader-cleanup-test0.tdb
dir.loader-cleanup-test1.tdb
......@@ -606,11 +628,15 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
)
declare_custom_tests(maxsize-for-loader.tdb)
add_test(ydb/maxsize-for-loader-A.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-A.tdb -f)
add_test(ydb/maxsize-for-loader-B.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-B.tdb)
add_test(ydb/maxsize-for-loader-A.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-A.tdb -f -c)
add_test(ydb/maxsize-for-loader-B.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-B.tdb -c)
add_test(ydb/maxsize-for-loader-Az.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-Az.tdb -f -z -c)
add_test(ydb/maxsize-for-loader-Bz.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-Bz.tdb -z -c)
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES
dir.maxsize-for-loader-A.tdb
dir.maxsize-for-loader-B.tdb
dir.maxsize-for-loader-Az.tdb
dir.maxsize-for-loader-Bz.tdb
)
declare_custom_tests(hotindexer-undo-do-test.tdb)
......@@ -667,6 +693,8 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
ydb/env-put-multiple.tdb
ydb/filesize.tdb
ydb/loader-cleanup-test0.tdb
ydb/loader-cleanup-test0z.tdb
ydb/loader-cleanup-test2z.tdb
ydb/manyfiles.tdb
ydb/recover-loader-test.abortrecover
ydb/recovery_fileops_stress.tdb
......@@ -712,6 +740,7 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
ydb/loader-stress-del.nop.loader
ydb/loader-stress-del.p.loader
ydb/loader-stress-del.comp.loader
ydb/loader-stress-test4z.tdb
ydb/test3039.tdb
ydb/test3529.tdb
ydb/test_update_stress.tdb
......
......@@ -7,18 +7,24 @@
#include "toku_pthread.h"
#include <db.h>
#include <sys/stat.h>
#include "toku_random.h"
using namespace toku;
bool fast = false;
DB_ENV *env;
enum {NUM_DBS=2};
int USE_PUTS=0;
uint32_t USE_COMPRESS=0;
bool do_check = false;
uint32_t num_rows = 1;
uint32_t which_db_to_fail = (uint32_t) -1;
uint32_t which_row_to_fail = (uint32_t) -1;
enum how_to_fail { FAIL_NONE, FAIL_KSIZE, FAIL_VSIZE } how_to_fail = FAIL_NONE;
static struct random_data random_data[NUM_DBS];
char random_buf[NUM_DBS][8];
static int put_multiple_generate(DB *dest_db,
DB *src_db __attribute__((__unused__)),
DBT *dest_key, DBT *dest_val,
......@@ -52,8 +58,8 @@ static int put_multiple_generate(DB *dest_db,
dest_val->ulen = vsize;
}
assert(ksize>=sizeof(uint32_t));
for (uint32_t i=0; i<ksize; i++) ((char*)dest_key->data)[i] = random();
for (uint32_t i=0; i<vsize; i++) ((char*)dest_val->data)[i] = random();
for (uint32_t i=0; i<ksize; i++) ((char*)dest_key->data)[i] = myrandom_r(&random_data[which]);
for (uint32_t i=0; i<vsize; i++) ((char*)dest_val->data)[i] = myrandom_r(&random_data[which]);
*(uint32_t*)dest_key->data = rownum;
dest_key->size = ksize;
dest_val->size = vsize;
......@@ -74,7 +80,18 @@ static void error_callback (DB *db __attribute__((__unused__)), int which_db, in
e->error_count++;
}
static void test_loader_maxsize(DB **dbs)
static void reset_random(void) {
int r;
for (int i = 0; i < NUM_DBS; i++) {
ZERO_STRUCT(random_data[i]);
ZERO_ARRAY(random_buf[i]);
r = myinitstate_r(i, random_buf[i], 8, &random_data[i]);
assert(r==0);
}
}
static void test_loader_maxsize(DB **dbs, DB **check_dbs)
{
int r;
DB_TXN *txn;
......@@ -85,10 +102,10 @@ static void test_loader_maxsize(DB **dbs)
db_flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0;
}
uint32_t loader_flags = USE_PUTS; // set with -p option
uint32_t loader_flags = USE_COMPRESS; // set with -p option
// create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0);
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
CKERR(r);
......@@ -98,6 +115,7 @@ static void test_loader_maxsize(DB **dbs)
r = loader->set_poll_function(loader, NULL, NULL);
CKERR(r);
reset_random();
// using loader->put, put values into DB
DBT key, val;
unsigned int k, v;
......@@ -107,13 +125,7 @@ static void test_loader_maxsize(DB **dbs)
dbt_init(&key, &k, sizeof(unsigned int));
dbt_init(&val, &v, sizeof(unsigned int));
r = loader->put(loader, &key, &val);
if (USE_PUTS) {
//PUT loader can return -1 if it finds an error during the puts.
CKERR2s(r, 0,-1);
}
else {
CKERR(r);
}
CKERR(r);
}
// close the loader
......@@ -127,16 +139,124 @@ static void test_loader_maxsize(DB **dbs)
}
assert(0);
checked:
r = txn->commit(txn, 0);
CKERR(r);
if (do_check && how_to_fail==FAIL_NONE) {
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
reset_random();
DBT keys[NUM_DBS];
DBT vals[NUM_DBS];
uint32_t flags[NUM_DBS];
for (int i = 0; i < NUM_DBS; i++) {
dbt_init_realloc(&keys[i]);
dbt_init_realloc(&vals[i]);
flags[i] = 0;
}
for(uint32_t i=0;i<num_rows;i++) {
k = i;
v = i;
dbt_init(&key, &k, sizeof(unsigned int));
dbt_init(&val, &v, sizeof(unsigned int));
DB* src_db = check_dbs[0];
r = env->put_multiple(env, src_db, txn, &key, &val, NUM_DBS, check_dbs, keys, vals, flags);
CKERR(r);
}
r = txn->commit(txn, 0);
CKERR(r);
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
for (int i = 0; i < NUM_DBS; i++) {
DBC *loader_cursor;
DBC *check_cursor;
r = dbs[i]->cursor(dbs[i], txn, &loader_cursor, 0);
CKERR(r);
r = dbs[i]->cursor(check_dbs[i], txn, &check_cursor, 0);
CKERR(r);
DBT loader_key;
DBT loader_val;
DBT check_key;
DBT check_val;
dbt_init_realloc(&loader_key);
dbt_init_realloc(&loader_val);
dbt_init_realloc(&check_key);
dbt_init_realloc(&check_val);
for (uint32_t x = 0; x <= num_rows; x++) {
int r_loader = loader_cursor->c_get(loader_cursor, &loader_key, &loader_val, DB_NEXT);
int r_check = check_cursor->c_get(check_cursor, &check_key, &check_val, DB_NEXT);
assert(r_loader == r_check);
if (x == num_rows) {
CKERR2(r_loader, DB_NOTFOUND);
CKERR2(r_check, DB_NOTFOUND);
} else {
CKERR(r_loader);
CKERR(r_check);
}
assert(loader_key.size == check_key.size);
assert(loader_val.size == check_val.size);
assert(memcmp(loader_key.data, check_key.data, loader_key.size) == 0);
assert(memcmp(loader_val.data, check_val.data, loader_val.size) == 0);
}
toku_free(loader_key.data);
toku_free(loader_val.data);
toku_free(check_key.data);
toku_free(check_val.data);
loader_cursor->c_close(loader_cursor);
check_cursor->c_close(check_cursor);
}
for (int i = 0; i < NUM_DBS; i++) {
toku_free(keys[i].data);
toku_free(vals[i].data);
dbt_init_realloc(&keys[i]);
dbt_init_realloc(&vals[i]);
}
r = txn->commit(txn, 0);
CKERR(r);
}
}
char *free_me = NULL;
const char *env_dir = ENVDIR; // the default env_dir
static void create_and_open_dbs(DB **dbs, const char *suffix, int *idx) {
int r;
DBT desc;
dbt_init(&desc, "foo", sizeof("foo"));
enum {MAX_NAME=128};
char name[MAX_NAME*2];
for(int i=0;i<NUM_DBS;i++) {
idx[i] = i;
r = db_create(&dbs[i], env, 0); CKERR(r);
dbs[i]->app_private = &idx[i];
snprintf(name, sizeof(name), "db_%04x_%s", i, suffix);
r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
{ int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); }
});
}
}
static int
uint_or_size_dbt_cmp (DB *db, const DBT *a, const DBT *b) {
assert(db && a && b);
if (a->size == sizeof(unsigned int) && b->size == sizeof(unsigned int)) {
return uint_dbt_cmp(db, a, b);
}
return a->size - b->size;
}
static void run_test(uint32_t nr, uint32_t wdb, uint32_t wrow, enum how_to_fail htf) {
num_rows = nr; which_db_to_fail = wdb; which_row_to_fail = wrow; how_to_fail = htf;
//since src_key/val can't be modified, and we're using generate to make the failures, we can't fail on src_db (0)
assert(which_db_to_fail != 0);
int r;
{
int len = strlen(env_dir) + 20;
......@@ -148,7 +268,7 @@ static void run_test(uint32_t nr, uint32_t wdb, uint32_t wrow, enum how_to_fail
r = toku_os_mkdir(env_dir, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = db_env_create(&env, 0); CKERR(r);
r = env->set_default_bt_compare(env, uint_dbt_cmp); CKERR(r);
r = env->set_default_bt_compare(env, uint_or_size_dbt_cmp); CKERR(r);
r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r);
int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOG | DB_CREATE | DB_PRIVATE;
......@@ -157,37 +277,35 @@ static void run_test(uint32_t nr, uint32_t wdb, uint32_t wrow, enum how_to_fail
//Disable auto-checkpointing
r = env->checkpointing_set_period(env, 0); CKERR(r);
DBT desc;
dbt_init(&desc, "foo", sizeof("foo"));
enum {MAX_NAME=128};
char name[MAX_NAME*2];
DB **dbs = (DB**)toku_malloc(sizeof(DB*) * NUM_DBS);
assert(dbs != NULL);
DB **XMALLOC_N(NUM_DBS, dbs);
DB **check_dbs;
int idx[NUM_DBS];
for(int i=0;i<NUM_DBS;i++) {
idx[i] = i;
r = db_create(&dbs[i], env, 0); CKERR(r);
dbs[i]->app_private = &idx[i];
snprintf(name, sizeof(name), "db_%04x", i);
r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
{ int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); }
});
create_and_open_dbs(dbs, "loader", &idx[0]);
if (do_check && how_to_fail==FAIL_NONE) {
XMALLOC_N(NUM_DBS, check_dbs);
create_and_open_dbs(check_dbs, "check", &idx[0]);
}
if (verbose) printf("running test_loader()\n");
// -------------------------- //
test_loader_maxsize(dbs);
test_loader_maxsize(dbs, check_dbs);
// -------------------------- //
if (verbose) printf("done test_loader()\n");
for(int i=0;i<NUM_DBS;i++) {
dbs[i]->close(dbs[i], 0); CKERR(r);
dbs[i] = NULL;
if (do_check && how_to_fail==FAIL_NONE) {
check_dbs[i]->close(check_dbs[i], 0); CKERR(r);
check_dbs[i] = NULL;
}
}
r = env->close(env, 0); CKERR(r);
toku_free(dbs);
if (do_check && how_to_fail==FAIL_NONE) {
toku_free(check_dbs);
}
}
// ------------ infrastructure ----------
......@@ -199,12 +317,12 @@ int test_main(int argc, char * const *argv) {
do_args(argc, argv);
run_test(1, (uint32_t) -1, (uint32_t) -1, FAIL_NONE);
run_test(1, 0, 0, FAIL_NONE);
run_test(1, 0, 0, FAIL_KSIZE);
run_test(1, 0, 0, FAIL_VSIZE);
run_test(1, 1, 0, FAIL_NONE);
run_test(1, 1, 0, FAIL_KSIZE);
run_test(1, 1, 0, FAIL_VSIZE);
if (!fast) {
run_test(1000000, 0, 500000, FAIL_KSIZE);
run_test(1000000, 0, 500000, FAIL_VSIZE);
run_test(1000000, 1, 500000, FAIL_KSIZE);
run_test(1000000, 1, 500000, FAIL_VSIZE);
}
toku_free(free_me);
return 0;
......@@ -223,7 +341,8 @@ static void do_args(int argc, char * const argv[]) {
fprintf(stderr, " -h help\n");
fprintf(stderr, " -v verbose\n");
fprintf(stderr, " -q quiet\n");
fprintf(stderr, " -p use DB->put\n");
fprintf(stderr, " -z compress intermediates\n");
fprintf(stderr, " -c compare with regular dbs\n");
fprintf(stderr, " -f fast (suitable for vgrind)\n");
exit(resultcode);
} else if (strcmp(argv[0], "-e")==0) {
......@@ -235,13 +354,15 @@ static void do_args(int argc, char * const argv[]) {
assert(r<len);
env_dir = toku_strdup(full_env_dir);
free_me = (char *) env_dir;
} else if (strcmp(argv[0], "-c")==0) {
do_check = true;
} else if (strcmp(argv[0], "-v")==0) {
verbose++;
} else if (strcmp(argv[0],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 1;
} else if (strcmp(argv[0], "-z")==0) {
USE_COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-f")==0) {
fast = true;
} else {
......
......@@ -266,6 +266,7 @@ int toku_os_close(int fd);
int toku_os_fclose(FILE * stream);
ssize_t toku_os_read(int fd, void *buf, size_t count);
ssize_t toku_os_pread(int fd, void *buf, size_t count, off_t offset);
FILE *toku_os_fmemopen(void *buf, size_t size, const char *mode);
// wrapper around fsync
void toku_file_fsync_without_accounting(int fd);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment