ydb.c 66.3 KB
Newer Older
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1
/* -*- mode: C; c-basic-offset: 4 -*- */
2
#ident "Copyright (c) 2007, 2008 Tokutek Inc.  All rights reserved."
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
3

4 5 6
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."

const char *toku_patent_string = "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it.";
7
const char *toku_copyright_string = "Copyright (c) 2007, 2008 Tokutek Inc.  All rights reserved.";
8

Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
9 10 11 12
#include <assert.h>
#include <errno.h>
#include <limits.h>
#include <stdarg.h>
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
13 14 15 16 17
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/fcntl.h>
#include <sys/stat.h>
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
18
#include <sys/types.h>
Yoni Fogel's avatar
Yoni Fogel committed
19
#include <ctype.h>
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
20
#include <unistd.h>
Yoni Fogel's avatar
Yoni Fogel committed
21
#include <libgen.h>
Rich Prohaska's avatar
Rich Prohaska committed
22
#include <pthread.h>
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
23

24 25
#include "ydb-internal.h"

26
#include "brt-internal.h"
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
27
#include "cachetable.h"
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
28 29
#include "log.h"
#include "memory.h"
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
30

Rich Prohaska's avatar
Rich Prohaska committed
31 32 33 34
/* the ydb big lock serializes access to the tokudb
   every call (including methods) into the tokudb library gets the lock 
   no internal function should invoke a method through an object */

35
#ifdef PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP
Rich Prohaska's avatar
Rich Prohaska committed
36
static pthread_mutex_t ydb_big_lock = PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP;
37 38
#else
static pthread_mutex_t ydb_big_lock = PTHREAD_MUTEX_INITIALIZER;
Yoni Fogel's avatar
Yoni Fogel committed
39
#endif
Rich Prohaska's avatar
Rich Prohaska committed
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65

static inline void ydb_lock() {
    int r = pthread_mutex_lock(&ydb_big_lock); assert(r == 0);
}

static inline void ydb_unlock() {
    int r = pthread_mutex_unlock(&ydb_big_lock); assert(r == 0);
}

/* the ydb reference is used to cleanup the library when there are no more references to it */
static int toku_ydb_refs = 0;

static inline void ydb_add_ref() {
    ++toku_ydb_refs;
}

static inline void ydb_unref() {
    assert(toku_ydb_refs > 0);
    if (--toku_ydb_refs == 0) {
        /* call global destructors */
        toku_malloc_cleanup();
    }
}

/* env methods */
static int toku_env_close(DB_ENV *env, u_int32_t flags);
66 67 68
static int toku_env_set_data_dir(DB_ENV * env, const char *dir);
static int toku_env_set_lg_dir(DB_ENV * env, const char *dir);
static int toku_env_set_tmp_dir(DB_ENV * env, const char *tmp_dir);
Rich Prohaska's avatar
Rich Prohaska committed
69 70

static inline void env_add_ref(DB_ENV *env) {
71
    ++env->i->ref_count;
Rich Prohaska's avatar
Rich Prohaska committed
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
}

static inline void env_unref(DB_ENV *env) {
    assert(env->i->ref_count > 0);
    if (--env->i->ref_count == 0)
        toku_env_close(env, 0);
}

static inline int env_opened(DB_ENV *env) {
    return env->i->cachetable != 0;
}

static int env_is_panicked(DB_ENV *dbenv) {
    if (dbenv==0) return 0;
    return dbenv->i->is_panicked || toku_logger_panicked(dbenv->i->logger);
}

#define HANDLE_PANICKED_ENV(env) ({ if (env_is_panicked(env)) return EINVAL; })
#define HANDLE_PANICKED_DB(db) HANDLE_PANICKED_ENV(db->dbenv)


/* db methods */
static inline int db_opened(DB *db) {
    return db->i->full_fname != 0;
}

static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags);
static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags);
static int toku_db_pget (DB *db, DB_TXN *txn, DBT *key, DBT *pkey, DBT *data, u_int32_t flags);
static int toku_db_cursor(DB *db, DB_TXN * txn, DBC **c, u_int32_t flags);

/* txn methods */

/* cursor methods */
static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag);
static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag);
static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag);
static int toku_c_del(DBC *c, u_int32_t flags);
static int toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags);
static int toku_c_close(DBC * c);
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
112

Rich Prohaska's avatar
Rich Prohaska committed
113
/* misc */
Yoni Fogel's avatar
Yoni Fogel committed
114
static char *construct_full_name(const char *dir, const char *fname);
115
static int do_associated_inserts (DB_TXN *txn, DBT *key, DBT *data, DB *secondary);
Yoni Fogel's avatar
Yoni Fogel committed
116
    
117

118 119 120 121
// If errcall is set, call it with the format string and optionally the stderrstring (if include_stderrstring).  The prefix is passed as a separate argument.
// If errfile is set, print to the errfile: prefix, fmt string, maybe include the stderr string.
// Both errcall and errfile may be called.
// If errfile is not set and errcall is not set, the use stderr as the errfile.
122
void toku_do_error_all_cases(const DB_ENV * env, int error, int include_stderrstring, int use_stderr_if_nothing_else, const char *fmt, va_list ap) {
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
    if (env->i->errcall) {
	// errcall gets prefix sent separately
	// the error message is the printf message, maybe followed by ": " and the dbstrerror (if include_stderrstring is set)
	char buf [4000];
	int count=0;
	if (fmt) {
	    count=vsnprintf(buf, sizeof(buf), fmt, ap);
	}
	if (include_stderrstring) {
	    count+=snprintf(&buf[count], sizeof(buf)-count, ": %s", db_strerror(error));
	}
	env->i->errcall(env, env->i->errpfx, buf);
    }
    {
	FILE *efile=env->i->errfile;
	if (efile==0 && env->i->errcall==0 && use_stderr_if_nothing_else) {
	    efile = stderr;
	}
	if (efile) {
	    if (env->i->errpfx) fprintf(efile, "%s: ", env->i->errpfx);
	    vfprintf(efile, fmt, ap);
	    if (include_stderrstring) {
		fprintf(efile, ": %s", db_strerror(error));
	    }
	}
    }
149 150
}

151 152
// Handle all the error cases (but don't do the default thing.)
static int do_error (DB_ENV *dbenv, int error, const char *string, ...) {
153
    if (toku_logger_panicked(dbenv->i->logger)) dbenv->i->is_panicked=1;
154 155
    va_list ap;
    va_start(ap, string);
156
    toku_do_error_all_cases(dbenv, error, 1, 0, string, ap);
157
    va_end(ap);
158
    return error;
Yoni Fogel's avatar
Yoni Fogel committed
159 160
}

Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
161 162
#define barf() ({ fprintf(stderr, "YDB: BARF %s:%d in %s\n", __FILE__, __LINE__, __func__); })
#define barff(fmt,...) ({ fprintf(stderr, "YDB: BARF %s:%d in %s, ", __FILE__, __LINE__, __func__); fprintf(stderr, fmt, __VA_ARGS__); })
163
#define note() ({ fprintf(svtderr, "YDB: Note %s:%d in %s\n", __FILE__, __LINE__, __func__); })
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
164 165
#define notef(fmt,...) ({ fprintf(stderr, "YDB: Note %s:%d in %s, ", __FILE__, __LINE__, __func__); fprintf(stderr, fmt, __VA_ARGS__); })

166
#if 0
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
167 168 169
static void print_flags(u_int32_t flags) {
    u_int32_t gotit = 0;
    int doneone = 0;
Bradley C. Kuszmaul's avatar
Fixup  
Bradley C. Kuszmaul committed
170
#define doit(flag) if (flag & flags) { if (doneone) fprintf(stderr, " | "); fprintf(stderr, "%s", #flag);  doneone=1; gotit|=flag; }
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
171 172 173 174 175 176 177 178 179 180 181
    doit(DB_INIT_LOCK);
    doit(DB_INIT_LOG);
    doit(DB_INIT_MPOOL);
    doit(DB_INIT_TXN);
    doit(DB_CREATE);
    doit(DB_THREAD);
    doit(DB_RECOVER);
    doit(DB_PRIVATE);
    if (gotit != flags)
        fprintf(stderr, "  flags 0x%x not accounted for", flags & ~gotit);
    fprintf(stderr, "\n");
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
182
}
183
#endif
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
184

Rich Prohaska's avatar
Rich Prohaska committed
185
static int env_parse_config_line(DB_ENV* dbenv, char *command, char *value) {
Yoni Fogel's avatar
Yoni Fogel committed
186 187 188
    int r;
    
    if (!strcmp(command, "set_data_dir")) {
189
        r = toku_env_set_data_dir(dbenv, value);
Yoni Fogel's avatar
Yoni Fogel committed
190 191
    }
    else if (!strcmp(command, "set_tmp_dir")) {
192
        r = toku_env_set_tmp_dir(dbenv, value);
Yoni Fogel's avatar
Yoni Fogel committed
193 194
    }
    else if (!strcmp(command, "set_lg_dir")) {
195
        r = toku_env_set_lg_dir(dbenv, value);
Yoni Fogel's avatar
Yoni Fogel committed
196 197 198 199 200 201
    }
    else r = -1;
        
    return r;
}

Rich Prohaska's avatar
Rich Prohaska committed
202
static int env_read_config(DB_ENV *env) {
203
    HANDLE_PANICKED_ENV(env);
Yoni Fogel's avatar
Yoni Fogel committed
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
    const char* config_name = "DB_CONFIG";
    char* full_name = NULL;
    char* linebuffer = NULL;
    int buffersize;
    FILE* fp = NULL;
    int r = 0;
    int r2 = 0;
    char* command;
    char* value;
    
    full_name = construct_full_name(env->i->dir, config_name);
    if (full_name == 0) {
        r = ENOMEM;
        goto cleanup;
    }
    if ((fp = fopen(full_name, "r")) == NULL) {
        //Config file is optional.
        if (errno == ENOENT) {
            r = EXIT_SUCCESS;
            goto cleanup;
        }
        r = errno;
        goto cleanup;
    }
    //Read each line, applying configuration parameters.
    //After ignoring leading white space, skip any blank lines
    //or comments (starts with #)
    //Command contains no white space.  Value may contain whitespace.
    int linenumber;
    int ch = '\0';
    BOOL eof = FALSE;
    char* temp;
    char* end;
237
    int index;
Yoni Fogel's avatar
Yoni Fogel committed
238 239 240 241 242 243 244
    
    buffersize = 1<<10; //1KB
    linebuffer = toku_malloc(buffersize);
    if (!linebuffer) {
        r = ENOMEM;
        goto cleanup;
    }
245
    for (linenumber = 1; !eof; linenumber++) {
Yoni Fogel's avatar
Yoni Fogel committed
246
        /* Read a single line. */
247
        for (index = 0; TRUE; index++) {
Yoni Fogel's avatar
Yoni Fogel committed
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
            if ((ch = getc(fp)) == EOF) {
                eof = TRUE;
                if (ferror(fp)) {
                    /* Throw away current line and print warning. */
                    r = errno;
                    goto readerror;
                }
                break;
            }
            if (ch == '\n') break;
            if (index + 1 >= buffersize) {
                //Double the buffer.
                buffersize *= 2;
                linebuffer = toku_realloc(linebuffer, buffersize);
                if (!linebuffer) {
                    r = ENOMEM;
                    goto cleanup;
                }
            }
267
            linebuffer[index] = ch;
Yoni Fogel's avatar
Yoni Fogel committed
268 269 270 271 272 273 274 275 276 277 278 279 280 281
        }
        linebuffer[index] = '\0';
        end = &linebuffer[index];

        /* Separate the line into command/value */
        command = linebuffer;
        //Strip leading spaces.
        while (isspace(*command) && command < end) command++;
        //Find end of command.
        temp = command;
        while (!isspace(*temp) && temp < end) temp++;
        *temp++ = '\0'; //Null terminate command.
        value = temp;
        //Strip leading spaces.
282
        while (isspace(*value) && value < end) value++;
Yoni Fogel's avatar
Yoni Fogel committed
283 284 285 286 287 288 289 290 291
        if (value < end) {
            //Strip trailing spaces.
            temp = end;
            while (isspace(*(temp-1))) temp--;
            //Null terminate value.
            *temp = '\0';
        }
        //Parse the line.
        if (strlen(command) == 0 || command[0] == '#') continue; //Ignore Comments.
Rich Prohaska's avatar
Rich Prohaska committed
292
        r = env_parse_config_line(env, command, value < end ? value : "");
Yoni Fogel's avatar
Yoni Fogel committed
293 294 295 296
        if (r != 0) goto parseerror;
    }
    if (0) {
readerror:
297
        do_error(env, r, "Error reading from DB_CONFIG:%d.\n", linenumber);
Yoni Fogel's avatar
Yoni Fogel committed
298 299 300
    }
    if (0) {
parseerror:
301
        do_error(env, r, "Error parsing DB_CONFIG:%d.\n", linenumber);
Yoni Fogel's avatar
Yoni Fogel committed
302 303 304 305 306 307 308 309
    }
cleanup:
    if (full_name) toku_free(full_name);
    if (linebuffer) toku_free(linebuffer);
    if (fp) r2 = fclose(fp);
    return r ? r : r2;
}

Rich Prohaska's avatar
Rich Prohaska committed
310
static int toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {
311
    HANDLE_PANICKED_ENV(env);
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
312
    int r;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
313

Rich Prohaska's avatar
Rich Prohaska committed
314
    if (env_opened(env)) {
315 316
	return do_error(env, EINVAL, "The environment is already open\n");
    }
Yoni Fogel's avatar
Yoni Fogel committed
317

318 319 320
    if ((flags & DB_USE_ENVIRON) && (flags & DB_USE_ENVIRON_ROOT)) {
	return do_error(env, EINVAL, "DB_USE_ENVIRON and DB_USE_ENVIRON_ROOT are incompatible flags\n");
    }
Yoni Fogel's avatar
Yoni Fogel committed
321 322

    if (home) {
323 324 325
        if ((flags & DB_USE_ENVIRON) || (flags & DB_USE_ENVIRON_ROOT)) {
	    return do_error(env, EINVAL, "DB_USE_ENVIRON and DB_USE_ENVIRON_ROOT are incompatible with specifying a home\n");
	}
Yoni Fogel's avatar
Yoni Fogel committed
326 327
    }
    else if ((flags & DB_USE_ENVIRON) ||
Yoni Fogel's avatar
Yoni Fogel committed
328 329 330
             ((flags & DB_USE_ENVIRON_ROOT) && geteuid() == 0)) home = getenv("DB_HOME");

    if (!home) home = ".";
Yoni Fogel's avatar
Yoni Fogel committed
331

Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
332
	// Verify that the home exists.
Yoni Fogel's avatar
Yoni Fogel committed
333
	{
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
334 335
	struct stat buf;
	r = stat(home, &buf);
336 337 338
	if (r!=0) {
	    return do_error(env, errno, "Error from stat(\"%s\",...)\n", home);
	}
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
339 340
    }

Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
341
    if (!(flags & DB_PRIVATE)) {
342
	return do_error(env, EINVAL, "TokuDB requires DB_PRIVATE when opening an env\n");
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
343 344 345 346
    }

    if (env->i->dir)
        toku_free(env->i->dir);
Yoni Fogel's avatar
Yoni Fogel committed
347
    env->i->dir = toku_strdup(home);
348 349 350
    if (env->i->dir == 0) {
	return do_error(env, ENOMEM, "Out of memory\n");
    }
Yoni Fogel's avatar
Yoni Fogel committed
351 352 353 354 355 356
    if (0) {
        died1:
        toku_free(env->i->dir);
        env->i->dir = NULL;
        return r;
    }
Rich Prohaska's avatar
Rich Prohaska committed
357
    if ((r = env_read_config(env)) != 0) {
358 359
	goto died1;
    }
Yoni Fogel's avatar
Yoni Fogel committed
360
    
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
361 362
    env->i->open_flags = flags;
    env->i->open_mode = mode;
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
363 364

    if (flags & (DB_INIT_TXN | DB_INIT_LOG)) {
365 366
        char* full_dir = NULL;
        if (env->i->lg_dir) full_dir = construct_full_name(env->i->dir, env->i->lg_dir);
367 368 369 370 371
	assert(env->i->logger);
        r = toku_logger_open(full_dir ? full_dir : env->i->dir, env->i->logger);
        if (full_dir) toku_free(full_dir);
	if (r!=0) {
	    do_error(env, r, "Could not open logger\n");
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
372
	died2:
373
	    toku_logger_close(&env->i->logger);
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
374 375
	    goto died1;
	}
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
376 377
    }

378
    r = toku_brt_create_cachetable(&env->i->cachetable, env->i->cachetable_size, ZERO_LSN, env->i->logger);
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
379
    if (r!=0) goto died2;
380 381 382

    toku_logger_set_cachetable(env->i->logger, env->i->cachetable);

Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
383 384
    return 0;
}
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
385

Rich Prohaska's avatar
Rich Prohaska committed
386
static int toku_env_close(DB_ENV * env, u_int32_t flags) {
387 388
    // Even if the env is panicedk, try to close as much as we can.
    int is_panicked = env_is_panicked(env);
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
389
    int r0=0,r1=0;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
390
    if (env->i->cachetable)
391
        r0=toku_cachetable_close(&env->i->cachetable);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
392
    if (env->i->logger)
393
        r1=toku_logger_close(&env->i->logger);
Yoni Fogel's avatar
Yoni Fogel committed
394 395 396 397 398 399 400 401
    if (env->i->data_dirs) {
        u_int32_t i;
        assert(env->i->n_data_dirs > 0);
        for (i = 0; i < env->i->n_data_dirs; i++) {
            toku_free(env->i->data_dirs[i]);
        }
        toku_free(env->i->data_dirs);
    }
402 403
    if (env->i->lg_dir)
        toku_free(env->i->lg_dir);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
404 405
    if (env->i->tmp_dir)
        toku_free(env->i->tmp_dir);
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
406 407 408
    toku_free(env->i->dir);
    toku_free(env->i);
    toku_free(env);
409
    ydb_unref();
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
410 411 412
    if (flags!=0) return EINVAL;
    if (r0) return r0;
    if (r1) return r1;
413
    if (is_panicked) return EINVAL;
414
    return 0;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
415
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
416

Rich Prohaska's avatar
Rich Prohaska committed
417
static int toku_env_log_archive(DB_ENV * env, char **list[], u_int32_t flags) {
418
    env=env; flags=flags; // Suppress compiler warnings.
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
419 420 421
    *list = NULL;
    return 0;
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
422

Rich Prohaska's avatar
Rich Prohaska committed
423
static int toku_env_log_flush(DB_ENV * env, const DB_LSN * lsn) {
424
    HANDLE_PANICKED_ENV(env);
425
    env=env; lsn=lsn;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
426
    barf();
Bradley C. Kuszmaul's avatar
Fixup  
Bradley C. Kuszmaul committed
427
    return 1;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
428
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
429

Rich Prohaska's avatar
Rich Prohaska committed
430
static int toku_env_set_cachesize(DB_ENV * env, u_int32_t gbytes, u_int32_t bytes, int ncache) {
431
    HANDLE_PANICKED_ENV(env);
Rich Prohaska's avatar
Rich Prohaska committed
432 433
    if (ncache != 1)
        return EINVAL;
Rich Prohaska's avatar
Rich Prohaska committed
434 435 436 437 438
    u_int64_t cs64 = ((u_int64_t) gbytes << 30) + bytes;
    unsigned long cs = cs64;
    if (cs64 > cs)
        return EINVAL;
    env->i->cachetable_size = cs;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
439 440 441
    return 0;
}

Rich Prohaska's avatar
Rich Prohaska committed
442 443
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3

Rich Prohaska's avatar
Rich Prohaska committed
444
static int toku_env_get_cachesize(DB_ENV * env, u_int32_t *gbytes, u_int32_t *bytes, int *ncache) {
445
    HANDLE_PANICKED_ENV(env);
Rich Prohaska's avatar
Rich Prohaska committed
446 447 448 449 450 451
    *gbytes = env->i->cachetable_size >> 30;
    *bytes = env->i->cachetable_size & ((1<<30)-1);
    *ncache = 1;
    return 0;
}

Rich Prohaska's avatar
Rich Prohaska committed
452 453 454 455
static int locked_env_get_cachesize(DB_ENV *env, u_int32_t *gbytes, u_int32_t *bytes, int *ncache) {
    ydb_lock(); int r = toku_env_get_cachesize(env, gbytes, bytes, ncache); ydb_unlock(); return r;
}

Rich Prohaska's avatar
Rich Prohaska committed
456 457
#endif

Rich Prohaska's avatar
Rich Prohaska committed
458
static int toku_env_set_data_dir(DB_ENV * env, const char *dir) {
459
    HANDLE_PANICKED_ENV(env);
Yoni Fogel's avatar
Yoni Fogel committed
460 461
    u_int32_t i;
    int r;
462 463
    char** temp;
    char* new_dir;
Yoni Fogel's avatar
Yoni Fogel committed
464
    
Rich Prohaska's avatar
Rich Prohaska committed
465
    if (env_opened(env) || !dir) {
466 467
	return do_error(env, EINVAL, "You cannot set the data dir after opening the env\n");
    }
Yoni Fogel's avatar
Yoni Fogel committed
468 469 470 471 472 473 474 475 476 477 478
    
    if (env->i->data_dirs) {
        assert(env->i->n_data_dirs > 0);
        for (i = 0; i < env->i->n_data_dirs; i++) {
            if (!strcmp(dir, env->i->data_dirs[i])) {
                //It is already in the list.  We're done.
                return 0;
            }
        }
    }
    else assert(env->i->n_data_dirs == 0);
479 480 481 482
    new_dir = toku_strdup(dir);
    if (0) {
        died1:
        toku_free(new_dir);
Yoni Fogel's avatar
Yoni Fogel committed
483 484
        return r;
    }
485 486 487 488
    if (new_dir==NULL) {
	assert(errno == ENOMEM);
	return do_error(env, errno, "Out of memory\n");
    }
489 490 491 492
    temp = (char**) toku_realloc(env->i->data_dirs, (1 + env->i->n_data_dirs) * sizeof(char*));
    if (temp==NULL) {assert(errno == ENOMEM); r = ENOMEM; goto died1;}
    else env->i->data_dirs = temp;
    env->i->data_dirs[env->i->n_data_dirs] = new_dir;
Yoni Fogel's avatar
Yoni Fogel committed
493 494
    env->i->n_data_dirs++;
    return 0;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
495
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
496

Rich Prohaska's avatar
Rich Prohaska committed
497
static void toku_env_set_errcall(DB_ENV * env, toku_env_errcall_t errcall) {
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
498
    env->i->errcall = errcall;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
499
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
500

Rich Prohaska's avatar
Rich Prohaska committed
501
static void toku_env_set_errfile(DB_ENV*env, FILE*errfile) {
502 503 504
    env->i->errfile = errfile;
}

Rich Prohaska's avatar
Rich Prohaska committed
505
static void toku_env_set_errpfx(DB_ENV * env, const char *errpfx) {
506
    env->i->errpfx = errpfx;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
507
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
508

Rich Prohaska's avatar
Rich Prohaska committed
509
static int toku_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) {
510
    HANDLE_PANICKED_ENV(env);
511 512 513
    if (flags != 0 && onoff) {
	return do_error(env, EINVAL, "TokuDB does not (yet) support any nonzero ENV flags\n");
    }
Rich Prohaska's avatar
Rich Prohaska committed
514
    return 0;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
515
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
516

Rich Prohaska's avatar
Rich Prohaska committed
517
static int toku_env_set_lg_bsize(DB_ENV * env, u_int32_t bsize) {
518
    HANDLE_PANICKED_ENV(env);
519 520
    bsize=bsize;
    return do_error(env, EINVAL, "TokuDB does not (yet) support ENV->set_lg_bsize\n");
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
521
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
522

Rich Prohaska's avatar
Rich Prohaska committed
523
static int toku_env_set_lg_dir(DB_ENV * env, const char *dir) {
524
    HANDLE_PANICKED_ENV(env);
Rich Prohaska's avatar
Rich Prohaska committed
525
    if (env_opened(env)) {
526 527
	return do_error(env, EINVAL, "Cannot set log dir after opening the env\n");
    }
528 529

    if (env->i->lg_dir) toku_free(env->i->lg_dir);
530 531
    if (dir) {
        env->i->lg_dir = toku_strdup(dir);
532 533 534
        if (!env->i->lg_dir) {
	    return do_error(env, ENOMEM, "Out of memory\n");
	}
535
    }
536 537
    else env->i->lg_dir = NULL;
    return 0;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
538
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
539

Rich Prohaska's avatar
Rich Prohaska committed
540
static int toku_env_set_lg_max(DB_ENV * env, u_int32_t lg_max) {
541
    HANDLE_PANICKED_ENV(env);
542 543
    lg_max=lg_max;
    return do_error(env, EINVAL, "TokuDB does not (yet) support set_lg_max\n");
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
544
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
545

Rich Prohaska's avatar
Rich Prohaska committed
546
static int toku_env_set_lk_detect(DB_ENV * env, u_int32_t detect) {
547
    HANDLE_PANICKED_ENV(env);
548 549
    detect=detect;
    return do_error(env, EINVAL, "TokuDB does not (yet) support set_lk_detect\n");
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
550
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
551

552
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR <= 4
Rich Prohaska's avatar
Rich Prohaska committed
553
static int toku_env_set_lk_max(DB_ENV * env, u_int32_t lk_max) {
554 555
    HANDLE_PANICKED_ENV(env);
    lk_max=lk_max;
Bradley C. Kuszmaul's avatar
Fixup  
Bradley C. Kuszmaul committed
556
    return 0;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
557
}
Rich Prohaska's avatar
Rich Prohaska committed
558 559 560 561

static int locked_env_set_lk_max(DB_ENV * env, u_int32_t lk_max) {
    ydb_lock(); int r = toku_env_set_lk_max(env, lk_max); ydb_unlock(); return r;
}
562
#endif
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
563

Rich Prohaska's avatar
Rich Prohaska committed
564
//void __toku_env_set_noticecall (DB_ENV *env, void (*noticecall)(DB_ENV *, db_notices)) {
Bradley C. Kuszmaul's avatar
Fixup  
Bradley C. Kuszmaul committed
565 566
//    env->i->noticecall = noticecall;
//}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
567

Rich Prohaska's avatar
Rich Prohaska committed
568
static int toku_env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) {
569
    HANDLE_PANICKED_ENV(env);
Rich Prohaska's avatar
Rich Prohaska committed
570
    if (env_opened(env)) {
571 572 573 574 575
	return do_error(env, EINVAL, "Cannot set the tmp dir after opening an env\n");
    }
    if (!tmp_dir) {
	return do_error(env, EINVAL, "Tmp dir bust be non-null\n");
    }
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
576 577
    if (env->i->tmp_dir)
        toku_free(env->i->tmp_dir);
Yoni Fogel's avatar
Yoni Fogel committed
578
    env->i->tmp_dir = toku_strdup(tmp_dir);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
579
    return env->i->tmp_dir ? 0 : ENOMEM;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
580
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
581

Rich Prohaska's avatar
Rich Prohaska committed
582
static int toku_env_set_verbose(DB_ENV * env, u_int32_t which, int onoff) {
583 584
    HANDLE_PANICKED_ENV(env);
    which=which; onoff=onoff;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
585
    return 1;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
586
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
587

Rich Prohaska's avatar
Rich Prohaska committed
588
static int toku_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte, u_int32_t min, u_int32_t flags) {
589
    env=env; kbyte=kbyte; min=min; flags=flags;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
590
    return 0;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
591 592
}

Rich Prohaska's avatar
Rich Prohaska committed
593
static int toku_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** statp, u_int32_t flags) {
594 595
    HANDLE_PANICKED_ENV(env);
    statp=statp;flags=flags;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
596
    return 1;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
597 598
}

599
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR == 1
600
void toku_default_errcall(const char *errpfx, char *msg) {
601
#else
602
void toku_default_errcall(const DB_ENV *env, const char *errpfx, const char *msg) {
603 604
    env = env;
#endif
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
605 606 607
    fprintf(stderr, "YDB: %s: %s", errpfx, msg);
}

Rich Prohaska's avatar
Rich Prohaska committed
608
#if _THREAD_SAFE
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
609

Rich Prohaska's avatar
Rich Prohaska committed
610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683
static void locked_env_err(const DB_ENV * env, int error, const char *fmt, ...) {
    ydb_lock();
    va_list ap;
    va_start(ap, fmt);
    toku_do_error_all_cases(env, error, 1, 1, fmt, ap);
    va_end(ap);
    ydb_unlock();
}

static int locked_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {
    ydb_lock(); int r = toku_env_open(env, home, flags, mode); ydb_unlock(); return r;
}

static int locked_env_close(DB_ENV * env, u_int32_t flags) {
    ydb_lock(); int r = toku_env_close(env, flags); ydb_unlock(); return r;
}

static int locked_env_log_archive(DB_ENV * env, char **list[], u_int32_t flags) {
    ydb_lock(); int r = toku_env_log_archive(env, list, flags); ydb_unlock(); return r;
}

static int locked_env_log_flush(DB_ENV * env, const DB_LSN * lsn) {
    ydb_lock(); int r = toku_env_log_flush(env, lsn); ydb_unlock(); return r;
}

static int locked_env_set_cachesize(DB_ENV *env, u_int32_t gbytes, u_int32_t bytes, int ncache) {
    ydb_lock(); int r = toku_env_set_cachesize(env, gbytes, bytes, ncache); ydb_unlock(); return r;
}

static int locked_env_set_data_dir(DB_ENV * env, const char *dir) {
    ydb_lock(); int r = toku_env_set_data_dir(env, dir); ydb_unlock(); return r;
}

static int locked_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) {
    ydb_lock(); int r = toku_env_set_flags(env, flags, onoff); ydb_unlock(); return r;
}

static int locked_env_set_lg_bsize(DB_ENV * env, u_int32_t bsize) {
    ydb_lock(); int r = toku_env_set_lg_bsize(env, bsize); ydb_unlock(); return r;
}

static int locked_env_set_lg_dir(DB_ENV * env, const char *dir) {
    ydb_lock(); int r = toku_env_set_lg_dir(env, dir); ydb_unlock(); return r;
}

static int locked_env_set_lg_max(DB_ENV * env, u_int32_t lg_max) {
    ydb_lock(); int r = toku_env_set_lg_max(env, lg_max); ydb_unlock(); return r;
}

static int locked_env_set_lk_detect(DB_ENV * env, u_int32_t detect) {
    ydb_lock(); int r = toku_env_set_lk_detect(env, detect); ydb_unlock(); return r;
}

static int locked_env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) {
    ydb_lock(); int r = toku_env_set_tmp_dir(env, tmp_dir); ydb_unlock(); return r;
}

static int locked_env_set_verbose(DB_ENV * env, u_int32_t which, int onoff) {
    ydb_lock(); int r = toku_env_set_verbose(env, which, onoff); ydb_unlock(); return r;
}

static int locked_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte, u_int32_t min, u_int32_t flags) {
    ydb_lock(); int r = toku_env_txn_checkpoint(env, kbyte, min, flags); ydb_unlock(); return r;
}

static int locked_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** statp, u_int32_t flags) {
    ydb_lock(); int r = toku_env_txn_stat(env, statp, flags); ydb_unlock(); return r;
}

static int locked_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags);

#endif

static int toku_env_create(DB_ENV ** envp, u_int32_t flags) {
684
    if (flags!=0) return EINVAL;
685
    DB_ENV *MALLOC(result);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
686 687 688
    if (result == 0)
        return ENOMEM;
    memset(result, 0, sizeof *result);
Rich Prohaska's avatar
Rich Prohaska committed
689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705
    result->err = locked_env_err;
    result->open = locked_env_open;
    result->close = locked_env_close;
    result->txn_checkpoint = locked_env_txn_checkpoint;
    result->log_flush = locked_env_log_flush;
    result->set_errcall = toku_env_set_errcall;
    result->set_errfile = toku_env_set_errfile;
    result->set_errpfx = toku_env_set_errpfx;
    //result->set_noticecall = locked_env_set_noticecall;
    result->set_flags = locked_env_set_flags;
    result->set_data_dir = locked_env_set_data_dir;
    result->set_tmp_dir = locked_env_set_tmp_dir;
    result->set_verbose = locked_env_set_verbose;
    result->set_lg_bsize = locked_env_set_lg_bsize;
    result->set_lg_dir = locked_env_set_lg_dir;
    result->set_lg_max = locked_env_set_lg_max;
    result->set_cachesize = locked_env_set_cachesize;
Rich Prohaska's avatar
Rich Prohaska committed
706
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
Rich Prohaska's avatar
Rich Prohaska committed
707
    result->get_cachesize = locked_env_get_cachesize;
Rich Prohaska's avatar
Rich Prohaska committed
708
#endif
Rich Prohaska's avatar
Rich Prohaska committed
709
    result->set_lk_detect = locked_env_set_lk_detect;
710
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR <= 4
Rich Prohaska's avatar
Rich Prohaska committed
711
    result->set_lk_max = locked_env_set_lk_max;
712
#endif
Rich Prohaska's avatar
Rich Prohaska committed
713 714 715
    result->log_archive = locked_env_log_archive;
    result->txn_stat = locked_env_txn_stat;
    result->txn_begin = locked_txn_begin;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
716

717
    MALLOC(result->i);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
718 719 720 721 722
    if (result->i == 0) {
        toku_free(result);
        return ENOMEM;
    }
    memset(result->i, 0, sizeof *result->i);
723
    result->i->is_panicked=0;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
724
    result->i->ref_count = 1;
725 726
    result->i->errcall = 0;
    result->i->errpfx = 0;
727
    result->i->errfile = 0;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
728

729 730 731 732 733 734 735 736 737 738
    {
	int r = toku_logger_create(&result->i->logger);
	if (r!=0) {
	    toku_free(result->i);
	    toku_free(result);
	    return r;
	}
	assert(result->i->logger);
    }

739
    ydb_add_ref();
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
740 741 742 743
    *envp = result;
    return 0;
}

Rich Prohaska's avatar
Rich Prohaska committed
744 745 746 747 748
int db_env_create(DB_ENV ** envp, u_int32_t flags) {
    ydb_lock(); int r = toku_env_create(envp, flags); ydb_unlock(); return r;
}

static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) {
749
    HANDLE_PANICKED_ENV(txn->mgrp);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
750
    //notef("flags=%d\n", flags);
751 752 753 754 755 756 757 758 759 760 761 762
    int r;
    int nosync = (flags & DB_TXN_NOSYNC)!=0;
    flags &= ~DB_TXN_NOSYNC;
    if (!txn) return EINVAL;
    if (flags!=0) goto return_invalid;
    r = toku_logger_commit(txn->i->tokutxn, nosync);
    if (0) {
    return_invalid:
	r = EINVAL;
	toku_free(txn->i->tokutxn);
    }
    // Cleanup */
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
763 764 765
    if (txn->i)
        toku_free(txn->i);
    toku_free(txn);
766
    return r; // The txn is no good after the commit.
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
767 768
}

Rich Prohaska's avatar
Rich Prohaska committed
769
static u_int32_t toku_txn_id(DB_TXN * txn) {
770
    HANDLE_PANICKED_ENV(txn->mgrp);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
771 772
    barf();
    abort();
Rich Prohaska's avatar
Rich Prohaska committed
773
    return -1;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
774 775 776 777
}

static TXNID next_txn = 0;

778
static int toku_txn_abort(DB_TXN * txn) {
779
    HANDLE_PANICKED_ENV(txn->mgrp);
780 781 782 783
    int r = toku_logger_abort(txn->i->tokutxn);
    toku_free(txn->i);
    toku_free(txn);
    return r;
784 785
}

Rich Prohaska's avatar
Rich Prohaska committed
786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808
#if _THREAD_SAFE

static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags);

static int locked_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
    ydb_lock(); int r = toku_txn_begin(env, stxn, txn, flags); ydb_unlock(); return r;
}

static u_int32_t locked_txn_id(DB_TXN *txn) {
    ydb_lock(); u_int32_t r = toku_txn_id(txn); ydb_unlock(); return r;
}

static int locked_txn_commit(DB_TXN *txn, u_int32_t flags) {
    ydb_lock(); int r = toku_txn_commit(txn, flags); ydb_unlock(); return r;
}

static int locked_txn_abort(DB_TXN *txn) {
    ydb_lock(); int r = toku_txn_abort(txn); ydb_unlock(); return r;
}

#endif

static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
809
    HANDLE_PANICKED_ENV(env);
810
    if (!toku_logger_is_open(env->i->logger)) return do_error(env, EINVAL, "Environment does not have logging enabled\n");
811
    flags=flags;
812
    DB_TXN *MALLOC(result);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
813 814 815 816
    if (result == 0)
        return ENOMEM;
    memset(result, 0, sizeof *result);
    //notef("parent=%p flags=0x%x\n", stxn, flags);
817
    result->mgrp = env;
Rich Prohaska's avatar
Rich Prohaska committed
818 819 820
    result->abort = locked_txn_abort;
    result->commit = locked_txn_commit;
    result->id = locked_txn_id;
821
    MALLOC(result->i);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
822 823
    assert(result->i);
    result->i->parent = stxn;
824
    int r = toku_logger_txn_begin(stxn ? stxn->i->tokutxn : 0, &result->i->tokutxn, next_txn++, env->i->logger);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
825 826 827 828 829 830
    if (r != 0)
        return r;
    *txn = result;
    return 0;
}

Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
831
#if 0
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
832 833
int txn_commit(DB_TXN * txn, u_int32_t flags) {
    fprintf(stderr, "%s:%d\n", __FILE__, __LINE__);
834
    return toku_logger_log_commit(txn->i->tokutxn);
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
835
}
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
836
#endif
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
837

Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
838
int log_compare(const DB_LSN * a, const DB_LSN * b) {
Rich Prohaska's avatar
Rich Prohaska committed
839
    ydb_lock();
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
840 841
    fprintf(stderr, "%s:%d log_compare(%p,%p)\n", __FILE__, __LINE__, a, b);
    abort();
Rich Prohaska's avatar
Rich Prohaska committed
842
    ydb_unlock();
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
843 844
}

845 846
static int maybe_do_associate_create (DB_TXN*txn, DB*primary, DB*secondary) {
    DBC *dbc;
Rich Prohaska's avatar
Rich Prohaska committed
847
    int r = toku_db_cursor(secondary, txn, &dbc, 0);
848 849
    if (r!=0) return r;
    DBT key,data;
Rich Prohaska's avatar
Rich Prohaska committed
850
    r = toku_c_get(dbc, &key, &data, DB_FIRST);
851
    {
Rich Prohaska's avatar
Rich Prohaska committed
852
	int r2=toku_c_close(dbc);
853 854 855 856 857
	if (r!=DB_NOTFOUND) {
	    return r2;
	}
    }
    /* Now we know the secondary is empty. */
Rich Prohaska's avatar
Rich Prohaska committed
858
    r = toku_db_cursor(primary, txn, &dbc, 0);
859
    if (r!=0) return r;
Rich Prohaska's avatar
Rich Prohaska committed
860
    for (r = toku_c_get(dbc, &key, &data, DB_FIRST); r==0; r = toku_c_get(dbc, &key, &data, DB_NEXT)) {
861 862
	r = do_associated_inserts(txn, &key, &data, secondary);
	if (r!=0) {
Rich Prohaska's avatar
Rich Prohaska committed
863
	    toku_c_close(dbc);
864 865 866 867 868 869
	    return r;
	}
    }
    return 0;
}

870 871 872
static int toku_db_associate (DB *primary, DB_TXN *txn, DB *secondary,
			      int (*callback)(DB *secondary, const DBT *key, const DBT *data, DBT *result),
			      u_int32_t flags) {
873 874
    HANDLE_PANICKED_DB(primary);
    HANDLE_PANICKED_DB(secondary);
875 876
    unsigned int brtflags;
    
877 878
    if (secondary->i->primary) return EINVAL; // The secondary already has a primary
    if (primary->i->primary)   return EINVAL; // The primary already has a primary
879 880 881 882 883

    toku_brt_get_flags(primary->i->brt, &brtflags);
    if (brtflags & TOKU_DB_DUPSORT) return EINVAL;  //The primary may not have duplicate keys.
    if (brtflags & TOKU_DB_DUP)     return EINVAL;  //The primary may not have duplicate keys.

884 885 886 887 888 889 890 891 892 893
    if (!list_empty(&secondary->i->associated)) return EINVAL; // The secondary is in some list (or it is a primary)
    assert(secondary->i->associate_callback==0);      // Something's wrong if this isn't null we made it this far.
    secondary->i->associate_callback = callback;
#ifdef DB_IMMUTABLE_KEY
    secondary->i->associate_is_immutable = (DB_IMMUTABLE_KEY&flags)!=0;
    flags &= ~DB_IMMUTABLE_KEY;
#else
    secondary->i->associate_is_immutable = 0;
#endif
    if (flags!=0 && flags!=DB_CREATE) return EINVAL; // after removing DB_IMMUTABLE_KEY the flags better be 0 or DB_CREATE
894 895
    list_push(&primary->i->associated, &secondary->i->associated);
    secondary->i->primary = primary;
896
    if (flags==DB_CREATE) {
897
	// To do this:  If the secondary is empty, then open a cursor on the primary.  Step through it all, doing the callbacks.
898
	// Then insert each callback result into the secondary.
899
	return maybe_do_associate_create(txn, primary, secondary);
900 901
    }
    return 0;
902 903
}

904
static int toku_db_close(DB * db, u_int32_t flags) {
905 906 907 908 909 910 911 912 913 914 915 916 917 918
    if (db->i->primary==0) {
	// It is a primary.  Unlink all the secondaries. */
	while (!list_empty(&db->i->associated)) {
	    assert(list_struct(list_head(&db->i->associated),
			       struct __toku_db_internal,
			       associated)->primary==db);
	    list_remove(list_head(&db->i->associated));
	}
    } else {
	// It is a secondary.  Remove it from the list, (which it must be in .*/
	if (!list_empty(&db->i->associated)) {
	    list_remove(&db->i->associated);
	}
    }
919
    flags=flags;
920
    int r = toku_close_brt(db->i->brt);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
921 922 923
    if (r != 0)
        return r;
    // printf("%s:%d %d=__toku_db_close(%p)\n", __FILE__, __LINE__, r, db);
924
    int is_panicked = env_is_panicked(db->dbenv); // Even if panicked, let's close as much as we can.
Rich Prohaska's avatar
Rich Prohaska committed
925
    env_unref(db->dbenv);
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
926 927 928 929
    toku_free(db->i->database_name);
    toku_free(db->i->full_fname);
    toku_free(db->i);
    toku_free(db);
930
    ydb_unref();
931
    if (r==0 && is_panicked) return EINVAL;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
932 933 934
    return r;
}

Yoni Fogel's avatar
Yoni Fogel committed
935 936 937 938 939 940 941 942 943 944 945 946 947 948 949
static int verify_secondary_key(DB *secondary, DBT *pkey, DBT *data, DBT *skey) {
    int r = 0;
    DBT idx;

    assert(secondary->i->primary != 0);
    memset(&idx, 0, sizeof(idx));
    secondary->i->associate_callback(secondary, pkey, data, &idx);
    if (r==DB_DONOTINDEX) return DB_SECONDARY_BAD;
#ifdef DB_DBT_MULTIPLE
    if (idx.flags & DB_DBT_MULTIPLE) {
        return EINVAL; // We aren't ready for this
    }
#endif
	if (skey->size != idx.size || memcmp(skey->data, idx.data, idx.size) != 0) r = DB_SECONDARY_BAD;
    if (idx.flags & DB_DBT_APPMALLOC) {
950
    	toku_free(idx.data);
Yoni Fogel's avatar
Yoni Fogel committed
951 952 953 954
    }
    return r;
}

Yoni Fogel's avatar
Yoni Fogel committed
955
static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
956
    HANDLE_PANICKED_DB(c->dbp);
957
    int r = toku_brt_cursor_get(c->i->c, key, data, flag, c->i->txn ? c->i->txn->i->tokutxn : 0);
Yoni Fogel's avatar
 
Yoni Fogel committed
958 959 960
    return r;
}

Yoni Fogel's avatar
Yoni Fogel committed
961
static int toku_c_del_noassociate(DBC * c, u_int32_t flags) {
962
    HANDLE_PANICKED_DB(c->dbp);
963
    int r = toku_brt_cursor_delete(c->i->c, flags, c->i->txn ? c->i->txn->i->tokutxn : 0);
Yoni Fogel's avatar
Yoni Fogel committed
964 965 966
    return r;
}

967 968 969 970 971 972 973 974 975 976 977 978 979 980 981
//Get the main portion of a cursor flag (excluding the bitwise or'd components).
static int get_main_cursor_flag(u_int32_t flag) {
#ifdef DB_READ_UNCOMMITTED
    flag &= ~DB_READ_UNCOMMITTED;
#endif    
#ifdef DB_MULTIPLE
    flag &= ~DB_MULTIPLE;
#endif
#ifdef DB_MULTIPLE_KEY
    flag &= ~DB_MULTIPLE_KEY;
#endif    
    flag &= ~DB_RMW;
    return flag;
}

982 983
static int toku_c_pget_save_original_data(DBT* dst, DBT* src) {
    int r;
984
    
985 986 987
    *dst = *src;
#ifdef DB_DBT_PARTIAL
#error toku_c_pget does not properly handle DB_DBT_PARTIAL
988
#endif
989 990 991 992 993 994
    //We may use this multiple times, we'll free only once at the end.
    dst->flags = DB_DBT_REALLOC;
    //Not using DB_DBT_USERMEM.
    dst->ulen = 0;
    if (src->size) {
        if (!src->data) return EINVAL;
995
        dst->data = toku_malloc(src->size);
996 997 998
        if (!dst->data) {
            r = ENOMEM;
            return r;
999
        }
1000
        memcpy(dst->data, src->data, src->size);
1001
    }
1002
    else dst->data = NULL;
1003 1004
    return 0;
}
1005

Yoni Fogel's avatar
 
Yoni Fogel committed
1006 1007
static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag) {
    int r;
1008 1009
    int r2;
    int r3;
1010
    DB *db = c->dbp;
1011
    HANDLE_PANICKED_DB(db);
Yoni Fogel's avatar
 
Yoni Fogel committed
1012
    DB *pdb = db->i->primary;
1013
    
Yoni Fogel's avatar
Yoni Fogel committed
1014 1015 1016 1017
    if (!pdb) return EINVAL;  //c_pget does not work on a primary.
	// If data and primary_key are both zeroed, the temporary storage used to fill in data is different in the two cases because they come from different trees.
	assert(db->i->brt!=pdb->i->brt); // Make sure they realy are different trees.
    assert(db!=pdb);
Yoni Fogel's avatar
Yoni Fogel committed
1018

1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030
    DBT copied_key;
    DBT copied_pkey;
    DBT copied_data;
    //Store original pointers.
    DBT* o_key = key;
    DBT* o_pkey = pkey;
    DBT* o_data = data;
    //Use copied versions for everything until/if success.
    key  = &copied_key;
    pkey = &copied_pkey;
    data = &copied_data;

Yoni Fogel's avatar
Yoni Fogel committed
1031
    if (0) {
1032
delete_silently_and_retry:
1033
        //Free any old data.
1034 1035 1036
        toku_free(key->data);
        toku_free(pkey->data);
        toku_free(data->data);
Yoni Fogel's avatar
Yoni Fogel committed
1037 1038 1039
        //Silently delete and re-run.
        r = toku_c_del_noassociate(c, 0);
        if (r != 0) return r;
1040
    }
1041 1042 1043 1044 1045 1046 1047 1048
    if (0) {
        died0:
        return r;
    }
    //Need to save all the original data.
    r = toku_c_pget_save_original_data(&copied_key, o_key);   if (r!=0) goto died0;
    if (0) {
        died1:
1049
        toku_free(key->data);
1050 1051 1052 1053 1054
        goto died0;
    }
    r = toku_c_pget_save_original_data(&copied_pkey, o_pkey); if (r!=0) goto died1;
    if (0) {
        died2:
1055
        toku_free(pkey->data);
1056 1057 1058 1059 1060
        goto died1;
    }
    r = toku_c_pget_save_original_data(&copied_data, o_data); if (r!=0) goto died2;
    if (0) {
        died3:
1061
        toku_free(data->data);
1062 1063 1064
        goto died2;
    }

Yoni Fogel's avatar
Yoni Fogel committed
1065
    r = toku_c_get_noassociate(c, key, pkey, flag);
1066
    if (r != 0) goto died3;
Rich Prohaska's avatar
Rich Prohaska committed
1067
    r = toku_db_get(pdb, c->i->txn, pkey, data, 0);
1068
    if (r == DB_NOTFOUND)   goto delete_silently_and_retry;
1069
    if (r != 0) goto died3;
Yoni Fogel's avatar
Yoni Fogel committed
1070
    r = verify_secondary_key(db, pkey, data, key);
1071
    if (r != 0)             goto delete_silently_and_retry;
1072 1073 1074 1075 1076 1077 1078 1079 1080

    //Copy everything and return.
    assert(r==0);

    r  = toku_brt_dbt_set_key(db->i->brt,  o_key,  key->data,  key->size);
    r2 = toku_brt_dbt_set_key(pdb->i->brt, o_pkey, pkey->data, pkey->size);
    r3 = toku_brt_dbt_set_value(pdb->i->brt, o_data, data->data, data->size);

    //Cleanup.
1081 1082 1083
    toku_free(key->data);
    toku_free(pkey->data);
    toku_free(data->data);
1084 1085 1086
    if (r!=0) return r;
    if (r2!=0) return r2;
    return r3;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1087 1088
}

Yoni Fogel's avatar
Yoni Fogel committed
1089
static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
1090
    DB *db = c->dbp;
1091
    HANDLE_PANICKED_DB(db);
Yoni Fogel's avatar
Yoni Fogel committed
1092 1093 1094 1095 1096 1097
    int r;

    if (db->i->primary==0) r = toku_c_get_noassociate(c, key, data, flag);
    else {
        // It's a c_get on a secondary.
        DBT primary_key;
1098
        u_int32_t get_flag = get_main_cursor_flag(flag);
Yoni Fogel's avatar
Yoni Fogel committed
1099 1100 1101 1102
        
        /* It is an error to use the DB_GET_BOTH or DB_GET_BOTH_RANGE flag on a
         * cursor that has been opened on a secondary index handle.
         */
Yoni Fogel's avatar
Yoni Fogel committed
1103 1104 1105 1106 1107
        if ((get_flag == DB_GET_BOTH)
#ifdef DB_GET_BOTH_RANGE
            || (get_flag == DB_GET_BOTH_RANGE)
#endif
        ) return EINVAL;
Yoni Fogel's avatar
Yoni Fogel committed
1108 1109 1110 1111 1112 1113
        memset(&primary_key, 0, sizeof(primary_key));
        r = toku_c_pget(c, key, &primary_key, data, flag);
    }
    return r;
}

1114
static int toku_c_close(DBC * c) {
1115
    int r = toku_brt_cursor_close(c->i->c);
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
1116 1117
    toku_free(c->i);
    toku_free(c);
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1118 1119 1120
    return r;
}

1121 1122 1123 1124 1125
static inline int keyeq(DBC *c, DBT *a, DBT *b) {
    DB *db = c->dbp;
    return db->i->brt->compare_fun(db, a, b) == 0;
}

Rich Prohaska's avatar
Rich Prohaska committed
1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137
static int toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags) {
    int r;
    DBC *count_cursor = 0;
    DBT currentkey; memset(&currentkey, 0, sizeof currentkey); currentkey.flags = DB_DBT_REALLOC;
    DBT currentval; memset(&currentval, 0, sizeof currentval); currentval.flags = DB_DBT_REALLOC;
    DBT key; memset(&key, 0, sizeof key); key.flags = DB_DBT_REALLOC;
    DBT val; memset(&val, 0, sizeof val); val.flags = DB_DBT_REALLOC;

    if (flags != 0) {
        r = EINVAL; goto finish;
    }

1138
    r = toku_c_get(cursor, &currentkey, &currentval, DB_CURRENT_BINDING);
Rich Prohaska's avatar
Rich Prohaska committed
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167
    if (r != 0) goto finish;
    
    r = toku_db_cursor(cursor->dbp, 0, &count_cursor, 0);
    if (r != 0) goto finish;

    *count = 0;
    r = toku_c_get(count_cursor, &currentkey, &currentval, DB_SET); 
    if (r != 0) {
        r = 0; goto finish; /* success, the current key must be deleted and there are no more */
    }

    for (;;) {
        *count += 1;
        r = toku_c_get(count_cursor, &key, &val, DB_NEXT);
        if (r != 0) break;
        if (!keyeq(count_cursor, &currentkey, &key)) break;
    }
    r = 0; /* success, we found at least one before the end */
finish:
    if (key.data) toku_free(key.data);
    if (val.data) toku_free(val.data);
    if (currentkey.data) toku_free(currentkey.data);
    if (currentval.data) toku_free(currentval.data);
    if (count_cursor) {
        int rr = toku_c_close(count_cursor); assert(rr == 0);
    }
    return r;
}

Yoni Fogel's avatar
 
Yoni Fogel committed
1168 1169 1170
static int toku_db_get_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
    int r;
    unsigned int brtflags;
1171
    if (flags!=0 && flags!=DB_GET_BOTH) return EINVAL;
Yoni Fogel's avatar
 
Yoni Fogel committed
1172 1173
    
    toku_brt_get_flags(db->i->brt, &brtflags);
Yoni Fogel's avatar
Yoni Fogel committed
1174
    if ((brtflags & TOKU_DB_DUPSORT) || flags == DB_GET_BOTH) {
1175

1176 1177
        if (flags != 0 && flags != DB_GET_BOTH) return EINVAL;
        // We aren't ready to handle flags such as DB_READ_COMMITTED or DB_READ_UNCOMMITTED or DB_RMW
1178
        
Yoni Fogel's avatar
 
Yoni Fogel committed
1179
        DBC *dbc;
Rich Prohaska's avatar
Rich Prohaska committed
1180
        r = toku_db_cursor(db, txn, &dbc, 0);
Yoni Fogel's avatar
 
Yoni Fogel committed
1181
        if (r!=0) return r;
1182
        r = toku_c_get_noassociate(dbc, key, data, flags == DB_GET_BOTH ? DB_GET_BOTH : DB_SET);
Rich Prohaska's avatar
Rich Prohaska committed
1183
        int r2 = toku_c_close(dbc);
Yoni Fogel's avatar
 
Yoni Fogel committed
1184 1185
        if (r!=0) return r;
        return r2;
1186
    } else {
1187
        if (flags != 0) return EINVAL;
1188
        return toku_brt_lookup(db->i->brt, key, data);
Yoni Fogel's avatar
 
Yoni Fogel committed
1189 1190 1191 1192 1193
    }
}

static int toku_db_del_noassociate(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) {
    int r;
1194
    if (flags!=0 && flags!=DB_DELETE_ANY) return EINVAL;
Yoni Fogel's avatar
 
Yoni Fogel committed
1195 1196 1197 1198 1199 1200 1201
    //DB_DELETE_ANY supresses the BDB DB->del return value indicating that the key was not found prior to the delete
    if (!(flags & DB_DELETE_ANY)) {
        DBT search_val; memset(&search_val, 0, sizeof search_val); 
        search_val.flags = DB_DBT_MALLOC;
        r = toku_db_get_noassociate(db, txn, key, &search_val, 0);
        if (r != 0)
            return r;
1202
        toku_free(search_val.data);
Yoni Fogel's avatar
 
Yoni Fogel committed
1203 1204
    } 
    //Do the actual deleting.
1205
    r = toku_brt_delete(db->i->brt, key, txn ? txn->i->tokutxn : 0);
Yoni Fogel's avatar
 
Yoni Fogel committed
1206 1207 1208
    return r;
}

Yoni Fogel's avatar
 
Yoni Fogel committed
1209
static int do_associated_deletes(DB_TXN *txn, DBT *key, DBT *data, DB *secondary) {
1210
    u_int32_t brtflags;
Yoni Fogel's avatar
 
Yoni Fogel committed
1211 1212 1213
    DBT idx;
    memset(&idx, 0, sizeof(idx));
    int r = secondary->i->associate_callback(secondary, key, data, &idx);
1214
    int r2 = 0;
Yoni Fogel's avatar
 
Yoni Fogel committed
1215 1216 1217 1218 1219 1220
    if (r==DB_DONOTINDEX) return 0;
#ifdef DB_DBT_MULTIPLE
    if (idx.flags & DB_DBT_MULTIPLE) {
        return EINVAL; // We aren't ready for this
    }
#endif
1221 1222
    toku_brt_get_flags(secondary->i->brt, &brtflags);
    if (brtflags & TOKU_DB_DUPSORT) {
1223
        //If the secondary has duplicates we need to use cursor deletes.
1224 1225 1226 1227 1228 1229 1230
        DBC *dbc;
        r = toku_db_cursor(secondary, txn, &dbc, 0);
        if (r!=0) goto cleanup;
        r = toku_c_get_noassociate(dbc, &idx, key, DB_GET_BOTH);
        if (r!=0) goto cleanup;
        r = toku_c_del_noassociate(dbc, 0);
    cleanup:
Rich Prohaska's avatar
Rich Prohaska committed
1231
        r2 = toku_c_close(dbc);
1232 1233
    } else 
        r = toku_db_del_noassociate(secondary, txn, &idx, DB_DELETE_ANY);
Yoni Fogel's avatar
 
Yoni Fogel committed
1234
    if (idx.flags & DB_DBT_APPMALLOC) {
1235
    	toku_free(idx.data);
Yoni Fogel's avatar
 
Yoni Fogel committed
1236
    }
1237 1238
    if (r!=0) return r;
    return r2;
Yoni Fogel's avatar
 
Yoni Fogel committed
1239 1240
}

1241
static int toku_c_del(DBC * c, u_int32_t flags) {
Yoni Fogel's avatar
 
Yoni Fogel committed
1242
    int r;
1243
    DB* db = c->dbp;
1244
    HANDLE_PANICKED_DB(db);
Yoni Fogel's avatar
 
Yoni Fogel committed
1245
    
1246
    //It is a primary with secondaries, or is a secondary.
Yoni Fogel's avatar
 
Yoni Fogel committed
1247
    if (db->i->primary != 0 || !list_empty(&db->i->associated)) {
Yoni Fogel's avatar
 
Yoni Fogel committed
1248 1249 1250 1251 1252 1253 1254 1255 1256
        DB* pdb;
        DBT pkey;
        DBT data;
        struct list *h;

        memset(&pkey, 0, sizeof(pkey));
        memset(&data, 0, sizeof(data));
        if (db->i->primary == 0) {
            pdb = db;
Rich Prohaska's avatar
Rich Prohaska committed
1257 1258
            r = toku_c_get(c, &pkey, &data, DB_CURRENT);
        } else {
Yoni Fogel's avatar
 
Yoni Fogel committed
1259
            DBT skey;
Yoni Fogel's avatar
 
Yoni Fogel committed
1260
            pdb = db->i->primary;
Yoni Fogel's avatar
 
Yoni Fogel committed
1261 1262
            memset(&skey, 0, sizeof(skey));
            r = toku_c_pget(c, &skey, &pkey, &data, DB_CURRENT);
Yoni Fogel's avatar
 
Yoni Fogel committed
1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273
        }
        if (r != 0) return r;
        
    	for (h = list_head(&pdb->i->associated); h != &pdb->i->associated; h = h->next) {
    	    struct __toku_db_internal *dbi = list_struct(h, struct __toku_db_internal, associated);
    	    if (dbi->db == db) continue;  //Skip current db (if its primary or secondary)
    	    r = do_associated_deletes(c->i->txn, &pkey, &data, dbi->db);
    	    if (r!=0) return r;
    	}
    	if (db->i->primary != 0) {
    	    //If this is a secondary, we did not delete from the primary.
Yoni Fogel's avatar
 
Yoni Fogel committed
1274 1275
    	    //Primaries cannot have duplicates, (noncursor) del is safe.
    	    r = toku_db_del_noassociate(pdb, c->i->txn, &pkey, DB_DELETE_ANY);
Yoni Fogel's avatar
 
Yoni Fogel committed
1276 1277 1278
    	    if (r!=0) return r;
    	}
    }
Yoni Fogel's avatar
 
Yoni Fogel committed
1279 1280
    r = toku_c_del_noassociate(c, flags);
    return r;    
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1281 1282
}

1283
static int toku_c_put(DBC *dbc, DBT *key, DBT *data, u_int32_t flags) {
1284
    DB* db = dbc->dbp;
1285
    HANDLE_PANICKED_DB(db);
1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310
    unsigned int brtflags;
    int r;
    DBT* put_key  = key;
    DBT* put_data = data;
    DBT* get_key  = key;
    DBT* get_data = data;
    
    //Cannot c_put in a secondary index.
    if (db->i->primary!=0) return EINVAL;
    toku_brt_get_flags(db->i->brt, &brtflags);
    //We do not support duplicates without sorting.
    if (!(brtflags & TOKU_DB_DUPSORT) && (brtflags & TOKU_DB_DUP)) return EINVAL;
    
    if (flags==DB_CURRENT) {
        DBT key_local;
        DBT data_local;
        memset(&key_local, 0, sizeof(DBT));
        memset(&data_local, 0, sizeof(DBT));
        //Can't afford to overwrite the local storage.
        key_local.flags = DB_DBT_MALLOC;
        data_local.flags = DB_DBT_MALLOC;
        r = toku_c_get(dbc, &key_local, &data_local, DB_CURRENT);
        if (0) {
            cleanup:
            if (flags==DB_CURRENT) {
1311 1312
                toku_free(key_local.data);
                toku_free(data_local.data);
1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338
            }
            return r;
        }
        if (r==DB_KEYEMPTY) return DB_NOTFOUND;
        if (r!=0) return r;
        if (brtflags & TOKU_DB_DUPSORT) {
            r = db->i->brt->dup_compare(db, &data_local, data);
            if (r!=0) {r = EINVAL; goto cleanup;}
        }
        //Remove old pair.
        r = toku_c_del(dbc, 0);
        if (r!=0) goto cleanup;
        get_key = put_key  = &key_local;
        goto finish;
    }
    else if (flags==DB_KEYFIRST || flags==DB_KEYLAST) {
        goto finish;        
    }
    else if (flags==DB_NODUPDATA) {
        //Must support sorted duplicates.
        if (!(brtflags & TOKU_DB_DUPSORT)) return EINVAL;
        r = toku_c_get(dbc, key, data, DB_GET_BOTH);
        if (r==0) return DB_KEYEXIST;
        if (r!=DB_NOTFOUND) return r;
        goto finish;
    }
Yoni Fogel's avatar
Yoni Fogel committed
1339
    //Flags must NOT be 0.
1340 1341
    else return EINVAL;
finish:
Rich Prohaska's avatar
Rich Prohaska committed
1342 1343
    //Insert new data with the key we got from c_get
    r = toku_db_put(db, dbc->i->txn, put_key, put_data, DB_YESOVERWRITE); // when doing the put, it should do an overwrite.
1344 1345 1346 1347 1348
    if (r!=0) goto cleanup;
    r = toku_c_get(dbc, get_key, get_data, DB_GET_BOTH);
    goto cleanup;
}

Rich Prohaska's avatar
Rich Prohaska committed
1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376
#if _THREAD_SAFE

static int locked_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag) {
    ydb_lock(); int r = toku_c_pget(c, key, pkey, data, flag); ydb_unlock(); return r;
}

static int locked_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
    ydb_lock(); int r = toku_c_get(c, key, data, flag); ydb_unlock(); return r;
}

static int locked_c_close(DBC * c) {
    ydb_lock(); int r = toku_c_close(c); ydb_unlock(); return r;
}

static int locked_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags) {
    ydb_lock(); int r = toku_c_count(cursor, count, flags); ydb_unlock(); return r;
}

static int locked_c_del(DBC * c, u_int32_t flags) {
    ydb_lock(); int r = toku_c_del(c, flags); ydb_unlock(); return r;
}

static int locked_c_put(DBC *dbc, DBT *key, DBT *data, u_int32_t flags) {
    ydb_lock(); int r = toku_c_put(dbc, key, data, flags); ydb_unlock(); return r;
}

#endif

1377
static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags) {
1378
    HANDLE_PANICKED_DB(db);
1379 1380
    if (flags != 0)
        return EINVAL;
1381
    DBC *MALLOC(result);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1382 1383 1384
    if (result == 0)
        return ENOMEM;
    memset(result, 0, sizeof *result);
Rich Prohaska's avatar
Rich Prohaska committed
1385 1386 1387 1388 1389 1390
    result->c_get = locked_c_get;
    result->c_pget = locked_c_pget;
    result->c_put = locked_c_put;
    result->c_close = locked_c_close;
    result->c_del = locked_c_del;
    result->c_count = locked_c_count;
1391
    MALLOC(result->i);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1392
    assert(result->i);
1393
    result->dbp = db;
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
1394
    result->i->txn = txn;
1395
    int r = toku_brt_cursor(db->i->brt, &result->i->c);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1396
    assert(r == 0);
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1397 1398 1399 1400
    *c = result;
    return 0;
}

Rich Prohaska's avatar
Rich Prohaska committed
1401
static int toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) {
1402
    HANDLE_PANICKED_DB(db);
1403
    int r;
Yoni Fogel's avatar
 
Yoni Fogel committed
1404

Yoni Fogel's avatar
 
Yoni Fogel committed
1405 1406
    //It is a primary with secondaries, or is a secondary.
    if (db->i->primary != 0 || !list_empty(&db->i->associated)) {
Yoni Fogel's avatar
 
Yoni Fogel committed
1407 1408
        DB* pdb;
        DBT data;
Yoni Fogel's avatar
 
Yoni Fogel committed
1409 1410
        DBT pkey;
        DBT *pdb_key;
Yoni Fogel's avatar
 
Yoni Fogel committed
1411
        struct list *h;
1412
        u_int32_t brtflags;
Yoni Fogel's avatar
 
Yoni Fogel committed
1413 1414

        memset(&data, 0, sizeof(data));
1415 1416

        toku_brt_get_flags(db->i->brt, &brtflags);
Rich Prohaska's avatar
Rich Prohaska committed
1417
        if (brtflags & TOKU_DB_DUPSORT) {
1418 1419
            int r2;
    	    DBC *dbc;
Yoni Fogel's avatar
Yoni Fogel committed
1420
    	    BOOL found = FALSE;
1421 1422 1423 1424 1425

            /* If we are deleting all copies from a secondary with duplicates,
             * We have to make certain we cascade all the deletes. */

            assert(db->i->primary!=0);    //Primary cannot have duplicates.
Rich Prohaska's avatar
Rich Prohaska committed
1426
            r = toku_db_cursor(db, txn, &dbc, 0);
1427
            if (r!=0) return r;
1428 1429
            r = toku_c_get_noassociate(dbc, key, &data, DB_SET);
            while (r==0) {
Rich Prohaska's avatar
Rich Prohaska committed
1430
                r = toku_c_del(dbc, 0);
Yoni Fogel's avatar
Yoni Fogel committed
1431
                if (r==0) found = TRUE;
1432
                if (r!=0 && r!=DB_KEYEMPTY) break;
1433 1434
                r = toku_c_get_noassociate(dbc, key, &data, DB_NEXT_DUP);
                if (r == DB_NOTFOUND) {
Yoni Fogel's avatar
Yoni Fogel committed
1435 1436
                    //If we deleted at least one we're happy.  Quit out.
                    if (found) r = 0;
1437
                    break;
1438 1439
                }
            }
1440

Rich Prohaska's avatar
Rich Prohaska committed
1441
            r2 = toku_c_close(dbc);
1442 1443 1444 1445
            if (r != 0) return r;
            return r2;
        }

1446 1447 1448 1449 1450 1451 1452 1453
        inline void cleanup() {
            if (data.data) toku_free(data.data);
            if (pkey.data) toku_free(pkey.data);
        }

        memset(&data, 0, sizeof data); data.flags = DB_DBT_REALLOC;
        memset(&pkey, 0, sizeof pkey); pkey.flags = DB_DBT_REALLOC;

Yoni Fogel's avatar
 
Yoni Fogel committed
1454 1455
        if (db->i->primary == 0) {
            pdb = db;
Rich Prohaska's avatar
Rich Prohaska committed
1456
            r = toku_db_get(db, txn, key, &data, 0);
Yoni Fogel's avatar
 
Yoni Fogel committed
1457
            pdb_key = key;
Yoni Fogel's avatar
 
Yoni Fogel committed
1458 1459 1460
        }
        else {
            pdb = db->i->primary;
Rich Prohaska's avatar
Rich Prohaska committed
1461
            r = toku_db_pget(db, txn, key, &pkey, &data, 0);
Yoni Fogel's avatar
 
Yoni Fogel committed
1462
            pdb_key = &pkey;
Yoni Fogel's avatar
 
Yoni Fogel committed
1463
        }
1464 1465 1466
        if (r != 0) { 
            cleanup(); return r; 
        }
Yoni Fogel's avatar
 
Yoni Fogel committed
1467 1468 1469
        
    	for (h = list_head(&pdb->i->associated); h != &pdb->i->associated; h = h->next) {
    	    struct __toku_db_internal *dbi = list_struct(h, struct __toku_db_internal, associated);
Yoni Fogel's avatar
 
Yoni Fogel committed
1470 1471
    	    if (dbi->db == db) continue;                  //Skip current db (if its primary or secondary)
    	    r = do_associated_deletes(txn, pdb_key, &data, dbi->db);
1472 1473 1474
    	    if (r!=0) { 
                cleanup(); return r;
            }
Yoni Fogel's avatar
 
Yoni Fogel committed
1475 1476 1477
    	}
    	if (db->i->primary != 0) {
    	    //If this is a secondary, we did not delete from the primary.
Yoni Fogel's avatar
 
Yoni Fogel committed
1478 1479
    	    //Primaries cannot have duplicates, (noncursor) del is safe.
    	    r = toku_db_del_noassociate(pdb, txn, pdb_key, DB_DELETE_ANY);
1480 1481 1482
    	    if (r!=0) { 
                cleanup(); return r;
            }
Yoni Fogel's avatar
 
Yoni Fogel committed
1483
    	}
1484 1485 1486

        cleanup();

Yoni Fogel's avatar
 
Yoni Fogel committed
1487 1488
    	//We know for certain it was already found, so no need to return DB_NOTFOUND.
    	flags |= DB_DELETE_ANY;
Yoni Fogel's avatar
 
Yoni Fogel committed
1489
    }
Yoni Fogel's avatar
 
Yoni Fogel committed
1490
    r = toku_db_del_noassociate(db, txn, key, flags);
Rich Prohaska's avatar
Rich Prohaska committed
1491
    return r;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1492
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1493

Rich Prohaska's avatar
Rich Prohaska committed
1494 1495 1496 1497
static inline int db_thread_need_flags(DBT *dbt) {
    return (dbt->flags & (DB_DBT_MALLOC+DB_DBT_REALLOC+DB_DBT_USERMEM)) == 0;
}

1498
static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
1499
    HANDLE_PANICKED_DB(db);
Yoni Fogel's avatar
Yoni Fogel committed
1500
    int r;
1501

Rich Prohaska's avatar
Rich Prohaska committed
1502
    if ((db->i->open_flags & DB_THREAD) && db_thread_need_flags(data))
1503 1504
        return EINVAL;

Yoni Fogel's avatar
Yoni Fogel committed
1505
    if (db->i->primary==0) r = toku_db_get_noassociate(db, txn, key, data, flags);
1506 1507
    else {
        // It's a get on a secondary.
Yoni Fogel's avatar
Yoni Fogel committed
1508 1509
        if (flags == DB_GET_BOTH) return EINVAL;
        assert(flags == 0); // We aren't ready to handle flags such as DB_READ_COMMITTED or DB_READ_UNCOMMITTED or DB_RMW
Rich Prohaska's avatar
Rich Prohaska committed
1510 1511 1512
        DBT primary_key; memset(&primary_key, 0, sizeof(primary_key)); primary_key.flags = DB_DBT_MALLOC;
        r = toku_db_pget(db, txn, key, &primary_key, data, 0);
        if (primary_key.data) toku_free(primary_key.data);
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
1513
    }
Yoni Fogel's avatar
Yoni Fogel committed
1514
    return r;
1515 1516 1517
}

static int toku_db_pget (DB *db, DB_TXN *txn, DBT *key, DBT *pkey, DBT *data, u_int32_t flags) {
1518
    HANDLE_PANICKED_DB(db);
1519
    int r;
1520 1521
    int r2;
    DBC *dbc;
1522 1523
    if (!db->i->primary) return EINVAL; // pget doesn't work on a primary.
    assert(flags==0); // not ready to handle all those other options
Rich Prohaska's avatar
Rich Prohaska committed
1524
    assert(db->i->brt != db->i->primary->i->brt); // Make sure they realy are different trees.
1525
    assert(db!=db->i->primary);
1526

Rich Prohaska's avatar
Rich Prohaska committed
1527 1528 1529
    if ((db->i->open_flags & DB_THREAD) && (db_thread_need_flags(pkey) || db_thread_need_flags(data)))
        return EINVAL;

Rich Prohaska's avatar
Rich Prohaska committed
1530
    r = toku_db_cursor(db, txn, &dbc, 0);
1531
    if (r!=0) return r;
Rich Prohaska's avatar
Rich Prohaska committed
1532
    r = toku_c_pget(dbc, key, pkey, data, DB_SET);
Yoni Fogel's avatar
Yoni Fogel committed
1533
    if (r==DB_KEYEMPTY) r = DB_NOTFOUND;
Rich Prohaska's avatar
Rich Prohaska committed
1534
    r2 = toku_c_close(dbc);
1535 1536
    if (r!=0) return r;
    return r2;    
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1537 1538
}

Rich Prohaska's avatar
Rich Prohaska committed
1539
#if 0
1540
static int toku_db_key_range(DB * db, DB_TXN * txn, DBT * dbt, DB_KEY_RANGE * kr, u_int32_t flags) {
1541 1542
    HANDLE_PANICKED_DB(db);
    txn=txn; dbt=dbt; kr=kr; flags=flags;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1543 1544
    barf();
    abort();
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1545
}
Rich Prohaska's avatar
Rich Prohaska committed
1546
#endif
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1547

Yoni Fogel's avatar
Yoni Fogel committed
1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567
static int construct_full_name_in_buf(const char *dir, const char *fname, char* full, int length) {
    int l;

    if (!full) return EINVAL;
    l = snprintf(full, length, "%s", dir);
    if (l >= length) return ENAMETOOLONG;
    if (l == 0 || full[l - 1] != '/') {
        if (l + 1 == length) return ENAMETOOLONG;
            
        /* Didn't put a slash down. */
        if (fname[0] != '/') {
            full[l++] = '/';
            full[l] = 0;
        }
    }
    l += snprintf(full + l, length - l, "%s", fname);
    if (l >= length) return ENAMETOOLONG;
    return 0;
}

Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1568 1569 1570
static char *construct_full_name(const char *dir, const char *fname) {
    if (fname[0] == '/')
        dir = "";
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1571
    {
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1572 1573 1574 1575
        int dirlen = strlen(dir);
        int fnamelen = strlen(fname);
        int len = dirlen + fnamelen + 2;        // One for the / between (which may not be there).  One for the trailing null.
        char *result = toku_malloc(len);
Yoni Fogel's avatar
Yoni Fogel committed
1576 1577 1578 1579
        // printf("%s:%d len(%d)=%d+%d+2\n", __FILE__, __LINE__, len, dirlen, fnamelen);
        if (construct_full_name_in_buf(dir, fname, result, len) != 0) {
            toku_free(result);
            result = NULL;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1580 1581
        }
        return result;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1582 1583 1584
    }
}

1585
static int find_db_file(DB_ENV* dbenv, const char *fname, char** full_name_out) {
Yoni Fogel's avatar
Yoni Fogel committed
1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617
    u_int32_t i;
    int r;
    struct stat statbuf;
    char* full_name;
    
    assert(full_name_out);    
    if (dbenv->i->data_dirs!=NULL) {
        assert(dbenv->i->n_data_dirs > 0);
        for (i = 0; i < dbenv->i->n_data_dirs; i++) {
            full_name = construct_full_name(dbenv->i->data_dirs[0], fname);
            if (!full_name) return ENOMEM;
            r = stat(full_name, &statbuf);
            if (r == 0) goto finish;
            else {
                toku_free(full_name);
                if (r != ENOENT) return r;
            }
        }
        //Did not find it at all.  Return the first data dir.
        full_name = construct_full_name(dbenv->i->data_dirs[0], fname);
        goto finish;
    }
    //Default without data_dirs is the environment directory.
    full_name = construct_full_name(dbenv->i->dir, fname);
    goto finish;

finish:
    if (!full_name) return ENOMEM;
    *full_name_out = full_name;
    return 0;    
}

1618
static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) {
1619
    HANDLE_PANICKED_DB(db);
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1620 1621
    // Warning.  Should check arguments.  Should check return codes on malloc and open and so forth.

Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1622
    int openflags = 0;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1623
    int r;
Yoni Fogel's avatar
Yoni Fogel committed
1624
    if (dbtype!=DB_BTREE && dbtype!=DB_UNKNOWN) return EINVAL;
1625 1626 1627
    int is_db_excl    = flags & DB_EXCL;    flags&=~DB_EXCL;
    int is_db_create  = flags & DB_CREATE;  flags&=~DB_CREATE;
    int is_db_rdonly  = flags & DB_RDONLY;  flags&=~DB_RDONLY;
Rich Prohaska's avatar
Rich Prohaska committed
1628
    int is_db_unknown = dbtype == DB_UNKNOWN;
1629
    if (flags & ~DB_THREAD) return EINVAL; // unknown flags
1630 1631 1632

    if (is_db_excl && !is_db_create) return EINVAL;
    if (dbtype==DB_UNKNOWN && is_db_excl) return EINVAL;
1633

1634 1635 1636 1637 1638 1639 1640 1641
    /* tokudb supports no duplicates and sorted duplicates only */
    unsigned int tflags;
    r = toku_brt_get_flags(db->i->brt, &tflags);
    if (r != 0) 
        return r;
    if ((tflags & TOKU_DB_DUP) && !(tflags & TOKU_DB_DUPSORT))
        return EINVAL;

1642
    if (db_opened(db))
1643
        return EINVAL;              /* It was already open. */
Yoni Fogel's avatar
Yoni Fogel committed
1644 1645 1646
    
    r = find_db_file(db->dbenv, fname, &db->i->full_fname);
    if (r != 0) goto error_cleanup;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1647
    // printf("Full name = %s\n", db->i->full_fname);
Yoni Fogel's avatar
Yoni Fogel committed
1648
    db->i->database_name = toku_strdup(dbname ? dbname : "");
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1649 1650 1651 1652
    if (db->i->database_name == 0) {
        r = ENOMEM;
        goto error_cleanup;
    }
1653
    if (is_db_rdonly)
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1654 1655 1656
        openflags |= O_RDONLY;
    else
        openflags |= O_RDWR;
1657
    
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1658
    {
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1659 1660 1661
        struct stat statbuf;
        if (stat(db->i->full_fname, &statbuf) == 0) {
            /* If the database exists at the file level, and we specified no db_name, then complain here. */
1662 1663
            if (dbname == 0 && is_db_create) {
                if (is_db_excl) {
1664 1665 1666
                    r = EEXIST;
                    goto error_cleanup;
                }
1667
		is_db_create = 0; // It's not a create after all, since the file exists.
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1668 1669
            }
        } else {
1670
            if (!is_db_create) {
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1671 1672 1673 1674
                r = ENOENT;
                goto error_cleanup;
            }
        }
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1675
    }
1676
    if (is_db_create) openflags |= O_CREAT;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1677 1678 1679

    db->i->open_flags = flags;
    db->i->open_mode = mode;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1680

1681
    r = toku_brt_open(db->i->brt, db->i->full_fname, fname, dbname,
1682 1683
		      is_db_create, is_db_excl, is_db_unknown,
		      db->dbenv->i->cachetable,
1684 1685
		      txn ? txn->i->tokutxn : NULL_TXN,
		      db);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1686 1687 1688
    if (r != 0)
        goto error_cleanup;

Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1689
    return 0;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700
 
error_cleanup:
    if (db->i->database_name) {
        toku_free(db->i->database_name);
        db->i->database_name = NULL;
    }
    if (db->i->full_fname) {
        toku_free(db->i->full_fname);
        db->i->full_fname = NULL;
    }
    return r;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1701
}
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
1702

1703 1704
static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
    int r;
1705

1706
    unsigned int brtflags;
1707
    r = toku_brt_get_flags(db->i->brt, &brtflags); assert(r == 0);
1708 1709 1710

    /* limit the size of key and data */
    unsigned int nodesize;
1711 1712 1713 1714 1715 1716 1717 1718
    r = toku_brt_get_nodesize(db->i->brt, &nodesize); assert(r == 0);
    if (brtflags & TOKU_DB_DUPSORT) {
        unsigned int limit = nodesize / (2*BRT_FANOUT-1);
        if (key->size + data->size >= limit)
            return EINVAL;
    } else {
        unsigned int limit = nodesize / (3*BRT_FANOUT-1);
        if (key->size >= limit || data->size >= limit)
1719
            return do_error(db->dbenv, EINVAL, "The largest key or data item allowed is %d bytes", limit);
1720
    }
1721 1722 1723 1724 1725 1726 1727

    if (flags == DB_YESOVERWRITE) {
        /* tokudb does insert or replace */
        ;
    } else if (flags == DB_NOOVERWRITE) {
        /* check if the key already exists */
        DBT testfordata;
Yoni Fogel's avatar
Yoni Fogel committed
1728
        r = toku_db_get_noassociate(db, txn, key, toku_init_dbt(&testfordata), 0);
1729 1730 1731 1732 1733 1734
        if (r == 0)
            return DB_KEYEXIST;
    } else if (flags != 0) {
        /* no other flags are currently supported */
        return EINVAL;
    } else {
1735
        assert(flags == 0);
1736
        if (brtflags & TOKU_DB_DUPSORT) {
1737
#if TDB_EQ_BDB
Yoni Fogel's avatar
Yoni Fogel committed
1738
            r = toku_db_get_noassociate(db, txn, key, data, DB_GET_BOTH);
1739 1740
            if (r == 0)
                return DB_KEYEXIST;
1741
#else
1742
	    return do_error(db->dbenv, EINVAL, "Tokudb requires that db->put specify DB_YESOVERWRITE or DB_NOOVERWRITE on DB_DUPSORT databases");
1743
#endif
1744 1745
        }
    }
1746 1747 1748 1749 1750 1751
    
    r = toku_brt_insert(db->i->brt, key, data, txn ? txn->i->tokutxn : 0);
    //printf("%s:%d %d=__toku_db_put(...)\n", __FILE__, __LINE__, r);
    return r;
}

1752 1753 1754 1755 1756 1757 1758 1759 1760 1761
static int do_associated_inserts (DB_TXN *txn, DBT *key, DBT *data, DB *secondary) {
    DBT idx;
    memset(&idx, 0, sizeof(idx));
    int r = secondary->i->associate_callback(secondary, key, data, &idx);
    if (r==DB_DONOTINDEX) return 0;
#ifdef DB_DBT_MULTIPLE
    if (idx.flags & DB_DBT_MULTIPLE) {
	return EINVAL; // We aren't ready for this
    }
#endif
1762
    r = toku_db_put_noassociate(secondary, txn, &idx, key, DB_YESOVERWRITE);
1763
    if (idx.flags & DB_DBT_APPMALLOC) {
1764
        toku_free(idx.data);
1765 1766 1767 1768
    }
    return r;
}

1769
static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
1770
    HANDLE_PANICKED_DB(db);
1771 1772
    int r;

1773
    //Cannot put directly into a secondary.
1774
    if (db->i->primary != 0) return EINVAL;
1775

1776
    r = toku_db_put_noassociate(db, txn, key, data, flags);
1777 1778
    if (r!=0) return r;
    // For each secondary add the relevant records.
Yoni Fogel's avatar
 
Yoni Fogel committed
1779 1780 1781 1782 1783 1784 1785
    assert(db->i->primary==0);
    // Only do it if it is a primary.   This loop would run an unknown number of times if we tried it on a secondary.
    struct list *h;
    for (h=list_head(&db->i->associated); h!=&db->i->associated; h=h->next) {
        struct __toku_db_internal *dbi=list_struct(h, struct __toku_db_internal, associated);
        r=do_associated_inserts(txn, key, data, dbi->db);
        if (r!=0) return r;
1786 1787
    }
    return 0;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1788
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1789

1790
static int toku_db_remove(DB * db, const char *fname, const char *dbname, u_int32_t flags) {
1791
    HANDLE_PANICKED_DB(db);
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
1792
    int r;
Yoni Fogel's avatar
Yoni Fogel committed
1793
    int r2;
Yoni Fogel's avatar
Yoni Fogel committed
1794
    char *full_name;
Yoni Fogel's avatar
Yoni Fogel committed
1795 1796 1797 1798 1799 1800

    //TODO: Verify DB* db not yet opened
    if (dbname) {
        //TODO: Verify the target db is not open
        //TODO: Use master database (instead of manual edit) when implemented.

Rich Prohaska's avatar
Rich Prohaska committed
1801
        if ((r = toku_db_open(db, NULL, fname, dbname, DB_BTREE, 0, 0777)) != 0) goto cleanup;
1802
        r = toku_brt_remove_subdb(db->i->brt, dbname, flags);
Yoni Fogel's avatar
Yoni Fogel committed
1803
cleanup:
Rich Prohaska's avatar
Rich Prohaska committed
1804
        r2 = toku_db_close(db, 0);
Yoni Fogel's avatar
Yoni Fogel committed
1805 1806 1807
        return r ? r : r2;
    }
    //TODO: Verify db file not in use. (all dbs in the file must be unused)
Yoni Fogel's avatar
Yoni Fogel committed
1808 1809 1810
    r = find_db_file(db->dbenv, fname, &full_name);
    if (r!=0) return r;
    assert(full_name);
Rich Prohaska's avatar
Rich Prohaska committed
1811
    r2 = toku_db_close(db, 0);
1812
    if (r == 0 && r2 == 0) {
Yoni Fogel's avatar
Yoni Fogel committed
1813
        if (unlink(full_name) != 0) r = errno;
1814
    }
Yoni Fogel's avatar
Yoni Fogel committed
1815
    toku_free(full_name);
Yoni Fogel's avatar
Yoni Fogel committed
1816
    return r ? r : r2;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1817
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1818

1819
static int toku_db_rename(DB * db, const char *namea, const char *nameb, const char *namec, u_int32_t flags) {
1820
    HANDLE_PANICKED_DB(db);
1821
    if (flags!=0) return EINVAL;
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
1822 1823
    char afull[PATH_MAX], cfull[PATH_MAX];
    int r;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1824 1825 1826 1827 1828
    assert(nameb == 0);
    r = snprintf(afull, PATH_MAX, "%s%s", db->dbenv->i->dir, namea);
    assert(r < PATH_MAX);
    r = snprintf(cfull, PATH_MAX, "%s%s", db->dbenv->i->dir, namec);
    assert(r < PATH_MAX);
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
1829
    return rename(afull, cfull);
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1830
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1831

1832
static int toku_db_set_bt_compare(DB * db, int (*bt_compare) (DB *, const DBT *, const DBT *)) {
1833
    HANDLE_PANICKED_DB(db);
1834
    int r = toku_brt_set_bt_compare(db->i->brt, bt_compare);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1835 1836 1837
    return r;
}

1838
static int toku_db_set_dup_compare(DB *db, int (*dup_compare)(DB *, const DBT *, const DBT *)) {
1839
    HANDLE_PANICKED_DB(db);
1840
    int r = toku_brt_set_dup_compare(db->i->brt, dup_compare);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1841 1842 1843
    return r;
}

Rich Prohaska's avatar
Rich Prohaska committed
1844
static int toku_db_set_flags(DB *db, u_int32_t flags) {
1845
    HANDLE_PANICKED_DB(db);
1846

Rich Prohaska's avatar
Rich Prohaska committed
1847
    /* the following matches BDB */
1848 1849
    if (db_opened(db) && flags != 0) return EINVAL;

Yoni Fogel's avatar
Yoni Fogel committed
1850 1851 1852 1853
    u_int32_t tflags;
    int r = toku_brt_get_flags(db->i->brt, &tflags);
    if (r!=0) return r;
    
1854 1855 1856 1857
    if (flags & DB_DUP)
        tflags += TOKU_DB_DUP;
    if (flags & DB_DUPSORT)
        tflags += TOKU_DB_DUPSORT;
Yoni Fogel's avatar
Yoni Fogel committed
1858
    r = toku_brt_set_flags(db->i->brt, tflags);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1859 1860 1861
    return r;
}

1862
static int toku_db_get_flags(DB *db, u_int32_t *pflags) {
1863
    HANDLE_PANICKED_DB(db);
1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881
    if (!pflags) return EINVAL;
    u_int32_t tflags;
    u_int32_t flags = 0;
    int r = toku_brt_get_flags(db->i->brt, &tflags);
    if (r!=0) return r;
    if (tflags & TOKU_DB_DUP) {
        tflags &= ~TOKU_DB_DUP;
        flags  |= DB_DUP;
    }
    if (tflags & TOKU_DB_DUPSORT) {
        tflags &= ~TOKU_DB_DUPSORT;
        flags  |= DB_DUPSORT;
    }
    assert(tflags == 0);
    *pflags = flags;
    return 0;
}

1882
static int toku_db_set_pagesize(DB *db, u_int32_t pagesize) {
1883
    HANDLE_PANICKED_DB(db);
1884
    int r = toku_brt_set_nodesize(db->i->brt, pagesize);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1885
    return r;
Bradley C. Kuszmaul's avatar
Rename  
Bradley C. Kuszmaul committed
1886
}
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1887

Rich Prohaska's avatar
Rich Prohaska committed
1888
#if 0
1889
static int toku_db_stat(DB * db, void *v, u_int32_t flags) {
1890 1891
    HANDLE_PANICKED_DB(db);
    v=v; flags=flags;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1892 1893 1894
    barf();
    abort();
}
Rich Prohaska's avatar
Rich Prohaska committed
1895
#endif
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1896

Rich Prohaska's avatar
Rich Prohaska committed
1897 1898 1899 1900 1901 1902
static int toku_db_fd(DB *db, int *fdp) {
    HANDLE_PANICKED_DB(db);
    if (!db_opened(db)) return EINVAL;
    return toku_brt_get_fd(db->i->brt, fdp);
}

Rich Prohaska's avatar
Rich Prohaska committed
1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976
#if _THREAD_SAFE

static int locked_db_associate (DB *primary, DB_TXN *txn, DB *secondary,
                                int (*callback)(DB *secondary, const DBT *key, const DBT *data, DBT *result), u_int32_t flags) {
    ydb_lock(); int r = toku_db_associate(primary, txn, secondary, callback, flags); ydb_unlock(); return r;
}

static int locked_db_close(DB * db, u_int32_t flags) {
    ydb_lock(); int r = toku_db_close(db, flags); ydb_unlock(); return r;
}

static int locked_db_cursor(DB *db, DB_TXN *txn, DBC **c, u_int32_t flags) {
    ydb_lock(); int r = toku_db_cursor(db, txn, c, flags); ydb_unlock(); return r;
}

static int locked_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) {
    ydb_lock(); int r = toku_db_del(db, txn, key, flags); ydb_unlock(); return r;
}

static int locked_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
    ydb_lock(); int r = toku_db_get(db, txn, key, data, flags); ydb_unlock(); return r;
}

static int locked_db_pget (DB *db, DB_TXN *txn, DBT *key, DBT *pkey, DBT *data, u_int32_t flags) {
    ydb_lock(); int r = toku_db_pget(db, txn, key, pkey, data, flags); ydb_unlock(); return r;
}

static int locked_db_open(DB *db, DB_TXN *txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) {
    ydb_lock(); int r = toku_db_open(db, txn, fname, dbname, dbtype, flags, mode); ydb_unlock(); return r;
}

static int locked_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
    ydb_lock(); int r = toku_db_put(db, txn, key, data, flags); ydb_unlock(); return r;
}

static int locked_db_remove(DB * db, const char *fname, const char *dbname, u_int32_t flags) {
    ydb_lock(); int r = toku_db_remove(db, fname, dbname, flags); ydb_unlock(); return r;
}

static int locked_db_rename(DB * db, const char *namea, const char *nameb, const char *namec, u_int32_t flags) {
    ydb_lock(); int r = toku_db_rename(db, namea, nameb, namec, flags); ydb_unlock(); return r;
}

static int locked_db_set_bt_compare(DB * db, int (*bt_compare) (DB *, const DBT *, const DBT *)) {
    ydb_lock(); int r = toku_db_set_bt_compare(db, bt_compare); ydb_unlock(); return r;
}

static int locked_db_set_dup_compare(DB * db, int (*dup_compare) (DB *, const DBT *, const DBT *)) {
    ydb_lock(); int r = toku_db_set_dup_compare(db, dup_compare); ydb_unlock(); return r;
}

static void locked_db_set_errfile (DB *db, FILE *errfile) {
    db->dbenv->set_errfile(db->dbenv, errfile);
}

static int locked_db_set_flags(DB *db, u_int32_t flags) {
    ydb_lock(); int r = toku_db_set_flags(db, flags); ydb_unlock(); return r;
}

static int locked_db_get_flags(DB *db, u_int32_t *flags) {
    ydb_lock(); int r = toku_db_get_flags(db, flags); ydb_unlock(); return r;
}

static int locked_db_set_pagesize(DB *db, u_int32_t pagesize) {
    ydb_lock(); int r = toku_db_set_pagesize(db, pagesize); ydb_unlock(); return r;
}

static int locked_db_fd(DB *db, int *fdp) {
    ydb_lock(); int r = toku_db_fd(db, fdp); ydb_unlock(); return r;
}

#endif

static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1977 1978
    int r;

1979 1980
    if (flags) return EINVAL;

Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1981 1982 1983
    /* if the env already exists then add a ref to it
       otherwise create one */
    if (env) {
Rich Prohaska's avatar
Rich Prohaska committed
1984
        if (!env_opened(env))
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1985
            return EINVAL;
Rich Prohaska's avatar
Rich Prohaska committed
1986
        env_add_ref(env);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1987
    } else {
Rich Prohaska's avatar
Rich Prohaska committed
1988
        r = toku_env_create(&env, 0);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1989 1990
        if (r != 0)
            return r;
Rich Prohaska's avatar
Rich Prohaska committed
1991
        r = toku_env_open(env, ".", DB_PRIVATE + DB_INIT_MPOOL, 0);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1992
        if (r != 0) {
Rich Prohaska's avatar
Rich Prohaska committed
1993
            toku_env_close(env, 0);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1994 1995
            return r;
        }
Rich Prohaska's avatar
Rich Prohaska committed
1996
        assert(env_opened(env));
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
1997
    }
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
1998
    
1999
    DB *MALLOC(result);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
2000
    if (result == 0) {
Rich Prohaska's avatar
Rich Prohaska committed
2001
        env_unref(env);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
2002
        return ENOMEM;
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
2003
    }
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
2004 2005
    memset(result, 0, sizeof *result);
    result->dbenv = env;
Rich Prohaska's avatar
Rich Prohaska committed
2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024
    result->associate = locked_db_associate;
    result->close = locked_db_close;
    result->cursor = locked_db_cursor;
    result->del = locked_db_del;
    result->get = locked_db_get;
    //    result->key_range = locked_db_key_range;
    result->open = locked_db_open;
    result->pget = locked_db_pget;
    result->put = locked_db_put;
    result->remove = locked_db_remove;
    result->rename = locked_db_rename;
    result->set_bt_compare = locked_db_set_bt_compare;
    result->set_dup_compare = locked_db_set_dup_compare;
    result->set_errfile = locked_db_set_errfile;
    result->set_pagesize = locked_db_set_pagesize;
    result->set_flags = locked_db_set_flags;
    result->get_flags = locked_db_get_flags;
    //    result->stat = locked_db_stat;
    result->fd = locked_db_fd;
2025
    MALLOC(result->i);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
2026 2027
    if (result->i == 0) {
        toku_free(result);
Rich Prohaska's avatar
Rich Prohaska committed
2028
        env_unref(env);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
2029 2030 2031
        return ENOMEM;
    }
    memset(result->i, 0, sizeof *result->i);
2032
    result->i->db = result;
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
2033 2034 2035 2036 2037 2038 2039 2040
    result->i->freed = 0;
    result->i->header = 0;
    result->i->database_number = 0;
    result->i->full_fname = 0;
    result->i->database_name = 0;
    result->i->open_flags = 0;
    result->i->open_mode = 0;
    result->i->brt = 0;
2041 2042 2043
    list_init(&result->i->associated);
    result->i->primary = 0;
    result->i->associate_callback = 0;
2044
    r = toku_brt_create(&result->i->brt);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
2045 2046 2047
    if (r != 0) {
        toku_free(result->i);
        toku_free(result);
Rich Prohaska's avatar
Rich Prohaska committed
2048
        env_unref(env);
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
2049 2050
        return ENOMEM;
    }
2051
    ydb_add_ref();
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
2052 2053
    *db = result;
    return 0;
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
2054
}
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
2055

Rich Prohaska's avatar
Rich Prohaska committed
2056 2057 2058 2059 2060 2061
int db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
    ydb_lock(); int r = toku_db_create(db, env, flags); ydb_unlock(); return r;
}

/* need db_strerror_r for multiple threads */

Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
2062 2063 2064 2065 2066 2067 2068 2069
char *db_strerror(int error) {
    char *errorstr;
    if (error >= 0) {
        errorstr = strerror(error);
        if (errorstr)
            return errorstr;
    }
    
2070 2071 2072 2073
    if (error==DB_BADFORMAT) {
	return "Database Bad Format (probably a corrupted database)";
    }

Rich Prohaska's avatar
Rich Prohaska committed
2074
    static char unknown_result[100];    // Race condition if two threads call this at the same time. However even in a bad case, it should be some sort of null-terminated string.
Bradley C. Kuszmaul's avatar
up  
Bradley C. Kuszmaul committed
2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086
    errorstr = unknown_result;
    snprintf(errorstr, sizeof unknown_result, "Unknown error code: %d", error);
    return errorstr;
}

const char *db_version(int *major, int *minor, int *patch) {
    if (major)
        *major = DB_VERSION_MAJOR;
    if (minor)
        *minor = DB_VERSION_MINOR;
    if (patch)
        *patch = DB_VERSION_PATCH;
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
2087 2088
    return DB_VERSION_STRING;
}