ha_berkeley.cc 76.7 KB
Newer Older
unknown's avatar
unknown committed
1
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
unknown's avatar
unknown committed
2

unknown's avatar
unknown committed
3 4 5 6
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
unknown's avatar
unknown committed
7

unknown's avatar
unknown committed
8 9 10 11
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
unknown's avatar
unknown committed
12

unknown's avatar
unknown committed
13 14 15 16 17 18 19 20 21 22 23
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


/*
  TODO:
  - Not compressed keys should use cmp_fix_length_key
  - Don't automaticly pack all string keys (To do this we need to modify
    CREATE TABLE so that one can use the pack_keys argument per key).
  - An argument to pack_key that we don't want compression.
unknown's avatar
unknown committed
24
  - DB_DBT_USERMEM should be used for fixed length tables
unknown's avatar
unknown committed
25 26 27
    We will need an updated Berkeley DB version for this.
  - Killing threads that has got a 'deadlock'
  - SHOW TABLE STATUS should give more information about the table.
unknown's avatar
unknown committed
28
  - Get a more accurate count of the number of rows (estimate_rows_upper_bound()).
29 30
    We could store the found number of rows when the table is scanned and
    then increment the counter for each attempted write.
31 32
  - We will need to extend the manager thread to makes checkpoints at
     given intervals.
unknown's avatar
unknown committed
33 34 35 36 37 38 39
  - When not using UPDATE IGNORE, don't make a sub transaction but abort
    the main transaction on errors.
  - Handling of drop table during autocommit=0 ?
    (Should we just give an error in this case if there is a pending
    transaction ?)
  - When using ALTER TABLE IGNORE, we should not start an transaction, but do
    everything wthout transactions.
40 41
  - When we do rollback, we need to subtract the number of changed rows
    from the updated tables.
unknown's avatar
unknown committed
42 43

  Testing of:
44 45
  - Mark tables that participate in a transaction so that they are not
    closed during the transaction.  We need to test what happens if
unknown's avatar
unknown committed
46
    MySQL closes a table that is updated by a not commited transaction.
unknown's avatar
unknown committed
47 48 49 50 51 52 53 54 55 56 57 58 59
*/


#ifdef __GNUC__
#pragma implementation				// gcc: Class implementation
#endif

#include "mysql_priv.h"
#ifdef HAVE_BERKELEY_DB
#include <m_ctype.h>
#include <myisampack.h>
#include <hash.h>
#include "ha_berkeley.h"
60
#include "sql_manager.h"
61
#include <stdarg.h>
unknown's avatar
unknown committed
62 63

#define HA_BERKELEY_ROWS_IN_TABLE 10000 /* to get optimization right */
unknown's avatar
unknown committed
64
#define HA_BERKELEY_RANGE_COUNT   100
65
#define HA_BERKELEY_MAX_ROWS	  10000000 /* Max rows in table */
unknown's avatar
unknown committed
66
/* extra rows for estimate_rows_upper_bound() */
67 68 69 70 71 72
#define HA_BERKELEY_EXTRA_ROWS	  100

/* Bits for share->status */
#define STATUS_PRIMARY_KEY_INIT 1
#define STATUS_ROW_COUNT_INIT	2
#define STATUS_BDB_ANALYZE	4
unknown's avatar
unknown committed
73 74

const char *ha_berkeley_ext=".db";
75
bool berkeley_shared_data=0;
unknown's avatar
unknown committed
76 77
u_int32_t berkeley_init_flags= DB_PRIVATE | DB_RECOVER, berkeley_env_flags=0,
          berkeley_lock_type=DB_LOCK_DEFAULT;
unknown's avatar
unknown committed
78
ulong berkeley_cache_size, berkeley_log_buffer_size, berkeley_log_file_size=0;
unknown's avatar
unknown committed
79 80
char *berkeley_home, *berkeley_tmpdir, *berkeley_logdir;
long berkeley_lock_scan_time=0;
81
ulong berkeley_trans_retry=1;
82
ulong berkeley_max_lock;
unknown's avatar
unknown committed
83 84 85 86 87 88
pthread_mutex_t bdb_mutex;

static DB_ENV *db_env;
static HASH bdb_open_tables;

const char *berkeley_lock_names[] =
89
{ "DEFAULT", "OLDEST","RANDOM","YOUNGEST",0 };
unknown's avatar
unknown committed
90 91
u_int32_t berkeley_lock_types[]=
{ DB_LOCK_DEFAULT, DB_LOCK_OLDEST, DB_LOCK_RANDOM };
92
TYPELIB berkeley_lock_typelib= {array_elements(berkeley_lock_names)-1,"",
93
				berkeley_lock_names, NULL};
unknown's avatar
unknown committed
94 95 96 97

static void berkeley_print_error(const char *db_errpfx, char *buffer);
static byte* bdb_get_key(BDB_SHARE *share,uint *length,
			 my_bool not_used __attribute__((unused)));
98
static BDB_SHARE *get_share(const char *table_name, TABLE *table);
unknown's avatar
unknown committed
99 100
static int free_share(BDB_SHARE *share, TABLE *table, uint hidden_primary_key,
		      bool mutex_is_locked);
unknown's avatar
unknown committed
101
static int write_status(DB *status_block, char *buff, uint length);
102
static void update_status(BDB_SHARE *share, TABLE *table);
103
static void berkeley_noticecall(DB_ENV *db_env, db_notices notice);
unknown's avatar
unknown committed
104 105


106

unknown's avatar
unknown committed
107 108 109 110 111 112 113 114 115 116
/* General functions */

bool berkeley_init(void)
{
  DBUG_ENTER("berkeley_init");

  if (!berkeley_tmpdir)
    berkeley_tmpdir=mysql_tmpdir;
  if (!berkeley_home)
    berkeley_home=mysql_real_data_home;
117 118
  DBUG_PRINT("bdb",("berkeley_home: %s",mysql_real_data_home));

unknown's avatar
unknown committed
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
  /*
    If we don't set set_lg_bsize() we will get into trouble when
    trying to use many open BDB tables.
    If log buffer is not set, assume that the we will need 512 byte per
    open table.  This is a number that we have reached by testing.
  */
  if (!berkeley_log_buffer_size)
  {
    berkeley_log_buffer_size= max(table_cache_size*512,32*1024);
  }
  /*
    Berkeley DB require that
    berkeley_log_file_size >= berkeley_log_buffer_size*4
  */
  berkeley_log_file_size= berkeley_log_buffer_size*4;
  berkeley_log_file_size= MY_ALIGN(berkeley_log_file_size,1024*1024L);
  berkeley_log_file_size= max(berkeley_log_file_size, 10*1024*1024L);
unknown's avatar
unknown committed
136 137

  if (db_env_create(&db_env,0))
unknown's avatar
unknown committed
138
    DBUG_RETURN(1); /* purecov: inspected */
unknown's avatar
unknown committed
139 140
  db_env->set_errcall(db_env,berkeley_print_error);
  db_env->set_errpfx(db_env,"bdb");
141
  db_env->set_noticecall(db_env, berkeley_noticecall);
unknown's avatar
unknown committed
142 143
  db_env->set_tmp_dir(db_env, berkeley_tmpdir);
  db_env->set_data_dir(db_env, mysql_data_home);
unknown's avatar
unknown committed
144
  db_env->set_flags(db_env, berkeley_env_flags, 1);
unknown's avatar
unknown committed
145
  if (berkeley_logdir)
unknown's avatar
unknown committed
146
    db_env->set_lg_dir(db_env, berkeley_logdir); /* purecov: tested */
unknown's avatar
unknown committed
147 148 149 150 151

  if (opt_endinfo)
    db_env->set_verbose(db_env,
			DB_VERB_CHKPOINT | DB_VERB_DEADLOCK | DB_VERB_RECOVERY,
			1);
unknown's avatar
unknown committed
152

unknown's avatar
unknown committed
153
  db_env->set_cachesize(db_env, 0, berkeley_cache_size, 0);
unknown's avatar
unknown committed
154 155
  db_env->set_lg_max(db_env, berkeley_log_file_size);
  db_env->set_lg_bsize(db_env, berkeley_log_buffer_size);
unknown's avatar
unknown committed
156
  db_env->set_lk_detect(db_env, berkeley_lock_type);
157 158
  if (berkeley_max_lock)
    db_env->set_lk_max(db_env, berkeley_max_lock);
159

unknown's avatar
unknown committed
160 161
  if (db_env->open(db_env,
		   berkeley_home,
unknown's avatar
unknown committed
162
		   berkeley_init_flags |  DB_INIT_LOCK |
unknown's avatar
unknown committed
163
		   DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN |
164
		   DB_CREATE | DB_THREAD, 0666))
unknown's avatar
unknown committed
165
  {
unknown's avatar
unknown committed
166 167
    db_env->close(db_env,0); /* purecov: inspected */
    db_env=0; /* purecov: inspected */
unknown's avatar
unknown committed
168
  }
169

unknown's avatar
unknown committed
170
  (void) hash_init(&bdb_open_tables,system_charset_info,32,0,0,
unknown's avatar
unknown committed
171
		   (hash_get_key) bdb_get_key,0,0);
172
  pthread_mutex_init(&bdb_mutex,MY_MUTEX_INIT_FAST);
unknown's avatar
unknown committed
173 174 175 176 177 178 179 180 181
  DBUG_RETURN(db_env == 0);
}


bool berkeley_end(void)
{
  int error;
  DBUG_ENTER("berkeley_end");
  if (!db_env)
unknown's avatar
unknown committed
182
    return 1; /* purecov: tested */
183
  berkeley_cleanup_log_files();
unknown's avatar
unknown committed
184 185 186 187 188 189 190 191 192 193 194 195
  error=db_env->close(db_env,0);		// Error is logged
  db_env=0;
  hash_free(&bdb_open_tables);
  pthread_mutex_destroy(&bdb_mutex);
  DBUG_RETURN(error != 0);
}

bool berkeley_flush_logs()
{
  int error;
  bool result=0;
  DBUG_ENTER("berkeley_flush_logs");
unknown's avatar
unknown committed
196
  if ((error=db_env->log_flush(db_env,0)))
unknown's avatar
unknown committed
197
  {
198 199
    my_error(ER_ERROR_DURING_FLUSH_LOGS,MYF(0),error); /* purecov: inspected */
    result=1; /* purecov: inspected */
unknown's avatar
unknown committed
200
  }
unknown's avatar
unknown committed
201
  if ((error=db_env->txn_checkpoint(db_env,0,0,0)))
unknown's avatar
unknown committed
202
  {
203 204
    my_error(ER_ERROR_DURING_CHECKPOINT,MYF(0),error); /* purecov: inspected */
    result=1; /* purecov: inspected */
unknown's avatar
unknown committed
205 206 207 208 209
  }
  DBUG_RETURN(result);
}


210
int berkeley_commit(THD *thd, void *trans)
unknown's avatar
unknown committed
211 212
{
  DBUG_ENTER("berkeley_commit");
213 214
  DBUG_PRINT("trans",("ending transaction %s",
		      trans == thd->transaction.stmt.bdb_tid ? "stmt" : "all"));
215
  int error=txn_commit((DB_TXN*) trans,0);
unknown's avatar
unknown committed
216 217
#ifndef DBUG_OFF
  if (error)
218
    DBUG_PRINT("error",("error: %d",error)); /* purecov: inspected */
unknown's avatar
unknown committed
219 220 221 222
#endif
  DBUG_RETURN(error);
}

223
int berkeley_rollback(THD *thd, void *trans)
unknown's avatar
unknown committed
224 225
{
  DBUG_ENTER("berkeley_rollback");
226 227
  DBUG_PRINT("trans",("aborting transaction %s",
		      trans == thd->transaction.stmt.bdb_tid ? "stmt" : "all"));
228
  int error=txn_abort((DB_TXN*) trans);
unknown's avatar
unknown committed
229 230 231
  DBUG_RETURN(error);
}

unknown's avatar
unknown committed
232

233
int berkeley_show_logs(Protocol *protocol)
unknown's avatar
unknown committed
234
{
235 236
  char **all_logs, **free_logs, **a, **f;
  int error=1;
unknown's avatar
unknown committed
237 238
  MEM_ROOT **root_ptr= my_pthread_getspecific_ptr(MEM_ROOT**,THR_MALLOC);
  MEM_ROOT show_logs_root, *old_mem_root= *root_ptr;
unknown's avatar
unknown committed
239 240
  DBUG_ENTER("berkeley_show_logs");

unknown's avatar
unknown committed
241 242
  init_sql_alloc(&show_logs_root, BDB_LOG_ALLOC_BLOCK_SIZE,
		 BDB_LOG_ALLOC_BLOCK_SIZE);
unknown's avatar
unknown committed
243
  *root_ptr= &show_logs_root;
244

245 246 247
  if ((error= db_env->log_archive(db_env, &all_logs,
				  DB_ARCH_ABS | DB_ARCH_LOG)) ||
      (error= db_env->log_archive(db_env, &free_logs, DB_ARCH_ABS)))
unknown's avatar
unknown committed
248 249 250
  {
    DBUG_PRINT("error", ("log_archive failed (error %d)", error));
    db_env->err(db_env, error, "log_archive: DB_ARCH_ABS");
unknown's avatar
unknown committed
251 252
    if (error== DB_NOTFOUND)
      error=0;					// No log files
253
    goto err;
unknown's avatar
unknown committed
254
  }
unknown's avatar
unknown committed
255 256
  /* Error is 0 here */
  if (all_logs)
unknown's avatar
unknown committed
257
  {
unknown's avatar
unknown committed
258
    for (a = all_logs, f = free_logs; *a; ++a)
unknown's avatar
unknown committed
259
    {
260
      protocol->prepare_for_resend();
261 262
      protocol->store(*a, system_charset_info);
      protocol->store("BDB", 3, system_charset_info);
unknown's avatar
unknown committed
263 264
      if (f && *f && strcmp(*a, *f) == 0)
      {
265
	f++;
266
	protocol->store(SHOW_LOG_STATUS_FREE, system_charset_info);
unknown's avatar
unknown committed
267 268
      }
      else
269
	protocol->store(SHOW_LOG_STATUS_INUSE, system_charset_info);
unknown's avatar
unknown committed
270

271
      if (protocol->write())
unknown's avatar
unknown committed
272 273 274 275 276
      {
	error=1;
	goto err;
      }
    }
unknown's avatar
unknown committed
277
  }
278
err:
unknown's avatar
unknown committed
279
  free_root(&show_logs_root,MYF(0));
unknown's avatar
unknown committed
280
  *root_ptr= old_mem_root;
281
  DBUG_RETURN(error);
unknown's avatar
unknown committed
282
}
unknown's avatar
unknown committed
283

unknown's avatar
unknown committed
284

unknown's avatar
unknown committed
285 286
static void berkeley_print_error(const char *db_errpfx, char *buffer)
{
287
  sql_print_error("%s:  %s",db_errpfx,buffer); /* purecov: tested */
unknown's avatar
unknown committed
288 289
}

unknown's avatar
unknown committed
290

291 292 293 294
static void berkeley_noticecall(DB_ENV *db_env, db_notices notice)
{
  switch (notice)
  {
295
  case DB_NOTICE_LOGFILE_CHANGED: /* purecov: tested */
296 297 298 299 300 301 302 303 304 305 306 307 308 309
    pthread_mutex_lock(&LOCK_manager);
    manager_status |= MANAGER_BERKELEY_LOG_CLEANUP;
    pthread_mutex_unlock(&LOCK_manager);
    pthread_cond_signal(&COND_manager);
    break;
  }
}

void berkeley_cleanup_log_files(void)
{
  DBUG_ENTER("berkeley_cleanup_log_files");
  char **names;
  int error;

unknown's avatar
unknown committed
310 311
// by HF. Sometimes it crashes. TODO - find out why
#ifndef EMBEDDED_LIBRARY
312 313
  /* XXX: Probably this should be done somewhere else, and
   * should be tunable by the user. */
unknown's avatar
unknown committed
314
  if ((error = db_env->txn_checkpoint(db_env, 0, 0, 0)))
315
    my_error(ER_ERROR_DURING_CHECKPOINT, MYF(0), error); /* purecov: inspected */
unknown's avatar
unknown committed
316
#endif
unknown's avatar
unknown committed
317
  if ((error = db_env->log_archive(db_env, &names, DB_ARCH_ABS)) != 0)
318
  {
319 320 321
    DBUG_PRINT("error", ("log_archive failed (error %d)", error)); /* purecov: inspected */
    db_env->err(db_env, error, "log_archive: DB_ARCH_ABS"); /* purecov: inspected */
    DBUG_VOID_RETURN; /* purecov: inspected */
322 323 324
  }

  if (names)
unknown's avatar
unknown committed
325 326 327 328
  {						/* purecov: tested */
    char **np;					/* purecov: tested */
    for (np = names; *np; ++np)			/* purecov: tested */
      my_delete(*np, MYF(MY_WME));		/* purecov: tested */
329

unknown's avatar
unknown committed
330
    free(names);				/* purecov: tested */
331 332 333 334
  }

  DBUG_VOID_RETURN;
}
unknown's avatar
unknown committed
335 336 337 338 339 340 341 342 343 344


/*****************************************************************************
** Berkeley DB tables
*****************************************************************************/

const char **ha_berkeley::bas_ext() const
{ static const char *ext[]= { ha_berkeley_ext, NullS }; return ext; }


345 346 347 348
ulong ha_berkeley::index_flags(uint idx, uint part, bool all_parts) const
{
  ulong flags= (HA_READ_NEXT | HA_READ_PREV | HA_READ_ORDER | HA_KEYREAD_ONLY
                | HA_READ_RANGE);
unknown's avatar
unknown committed
349
  for (uint i= all_parts ? 0 : part ; i <= part ; i++)
350
  {
unknown's avatar
unknown committed
351
    if (table->key_info[idx].key_part[i].field->type() == FIELD_TYPE_BLOB)
352 353
    {
      /* We can't use BLOBS to shortcut sorts */
unknown's avatar
unknown committed
354 355 356 357 358 359 360 361
      flags&= ~(HA_READ_ORDER | HA_KEYREAD_ONLY | HA_READ_RANGE);
      break;
    }
    switch (table->key_info[idx].key_part[i].field->key_type()) {
    case HA_KEYTYPE_TEXT:
    case HA_KEYTYPE_VARTEXT:
      /*
        As BDB stores only one copy of equal strings, we can't use key read
362
        on these. Binary collations do support key read though.
unknown's avatar
unknown committed
363
      */
364 365 366
      if (!(table->key_info[idx].key_part[i].field->charset()->state
           & MY_CS_BINSORT))
        flags&= ~HA_KEYREAD_ONLY;
unknown's avatar
unknown committed
367 368
      break;
    default:                                    // Keep compiler happy
369 370 371 372 373 374 375
      break;
    }
  }
  return flags;
}


376
static int
377
berkeley_cmp_hidden_key(DB* file, const DBT *new_key, const DBT *saved_key)
378 379 380
{
  ulonglong a=uint5korr((char*) new_key->data);
  ulonglong b=uint5korr((char*) saved_key->data);
unknown's avatar
unknown committed
381
  return  a < b ? -1 : (a > b ? 1 : 0);
382 383
}

unknown's avatar
unknown committed
384
static int
385
berkeley_cmp_packed_key(DB *file, const DBT *new_key, const DBT *saved_key)
unknown's avatar
unknown committed
386
{
387 388
  KEY *key=	      (new_key->app_private ? (KEY*) new_key->app_private :
		       (KEY*) (file->app_private));
unknown's avatar
unknown committed
389 390 391 392 393
  char *new_key_ptr=  (char*) new_key->data;
  char *saved_key_ptr=(char*) saved_key->data;
  KEY_PART_INFO *key_part= key->key_part, *end=key_part+key->key_parts;
  uint key_length=new_key->size;

394
  for (; key_part != end && (int) key_length > 0; key_part++)
unknown's avatar
unknown committed
395 396
  {
    int cmp;
397
    uint length;
unknown's avatar
unknown committed
398 399
    if (key_part->null_bit)
    {
400 401
      if (*new_key_ptr != *saved_key_ptr++)
	return ((int) *new_key_ptr - (int) saved_key_ptr[-1]);
402
      key_length--;
403 404
      if (!*new_key_ptr++)
	continue;
unknown's avatar
unknown committed
405
    }
406 407 408
    if ((cmp= key_part->field->pack_cmp(new_key_ptr,saved_key_ptr,
                                        key_part->length,
                                        key->table->insert_or_update)))
unknown's avatar
unknown committed
409
      return cmp;
410 411
    length= key_part->field->packed_col_length(new_key_ptr,
                                               key_part->length);
unknown's avatar
unknown committed
412 413
    new_key_ptr+=length;
    key_length-=length;
414 415
    saved_key_ptr+=key_part->field->packed_col_length(saved_key_ptr,
						      key_part->length);
unknown's avatar
unknown committed
416
  }
417
  return key->handler.bdb_return_if_eq;
unknown's avatar
unknown committed
418 419 420
}


421 422
/* The following is not yet used; Should be used for fixed length keys */

423
#ifdef NOT_YET
unknown's avatar
unknown committed
424
static int
425
berkeley_cmp_fix_length_key(DB *file, const DBT *new_key, const DBT *saved_key)
unknown's avatar
unknown committed
426
{
427 428
  KEY *key=	      (new_key->app_private ? (KEY*) new_key->app_private :
		       (KEY*) (file->app_private));
unknown's avatar
unknown committed
429 430 431 432 433
  char *new_key_ptr=  (char*) new_key->data;
  char *saved_key_ptr=(char*) saved_key->data;
  KEY_PART_INFO *key_part= key->key_part, *end=key_part+key->key_parts;
  uint key_length=new_key->size;

434
  for (; key_part != end && (int) key_length > 0 ; key_part++)
unknown's avatar
unknown committed
435 436
  {
    int cmp;
437
    if ((cmp=key_part->field->pack_cmp(new_key_ptr,saved_key_ptr,0,0)))
unknown's avatar
unknown committed
438 439 440 441 442
      return cmp;
    new_key_ptr+=key_part->length;
    key_length-= key_part->length;
    saved_key_ptr+=key_part->length;
  }
443
  return key->handler.bdb_return_if_eq;
unknown's avatar
unknown committed
444
}
445
#endif
unknown's avatar
unknown committed
446

447

448 449 450 451 452 453
/* Compare key against row */

static bool
berkeley_key_cmp(TABLE *table, KEY *key_info, const char *key, uint key_length)
{
  KEY_PART_INFO *key_part= key_info->key_part,
unknown's avatar
unknown committed
454
		*end=key_part+key_info->key_parts;
455

456
  for (; key_part != end && (int) key_length > 0; key_part++)
457 458
  {
    int cmp;
459
    uint length;
460 461 462
    if (key_part->null_bit)
    {
      key_length--;
463 464 465 466
      /*
	With the current usage, the following case will always be FALSE,
	because NULL keys are sorted before any other key
      */
467 468 469
      if (*key != (table->record[0][key_part->null_offset] &
		   key_part->null_bit) ? 0 : 1)
	return 1;
470
      if (!*key++)				// Null value
471 472
	continue;
    }
473 474 475 476 477
    /*
      Last argument has to be 0 as we are also using this to function to see
      if a key like 'a  ' matched a row with 'a'
    */
    if ((cmp= key_part->field->pack_cmp(key, key_part->length, 0)))
478
      return cmp;
479 480 481
    length= key_part->field->packed_col_length(key,key_part->length);
    key+= length;
    key_length-= length;
482
  }
483
  return 0;					// Identical keys
484 485
}

486

487
int ha_berkeley::open(const char *name, int mode, uint test_if_locked)
unknown's avatar
unknown committed
488 489 490 491 492 493
{
  char name_buff[FN_REFLEN];
  uint open_mode=(mode == O_RDONLY ? DB_RDONLY : 0) | DB_THREAD;
  int error;
  DBUG_ENTER("ha_berkeley::open");

494 495 496 497 498
  /* Open primary key */
  hidden_primary_key=0;
  if ((primary_key=table->primary_key) >= MAX_KEY)
  {						// No primary key
    primary_key=table->keys;
499
    key_used_on_scan=MAX_KEY;
500 501
    ref_length=hidden_primary_key=BDB_HIDDEN_PRIMARY_KEY_LENGTH;
  }
502 503
  else
    key_used_on_scan=primary_key;
504

unknown's avatar
unknown committed
505
  /* Need some extra memory in case of packed keys */
506
  uint max_key_length= table->max_key_length + MAX_REF_PARTS*3;
unknown's avatar
unknown committed
507 508 509 510 511
  if (!(alloc_ptr=
	my_multi_malloc(MYF(MY_WME),
			&key_buff,  max_key_length,
			&key_buff2, max_key_length,
			&primary_key_buff,
512 513
			(hidden_primary_key ? 0 :
			 table->key_info[table->primary_key].key_length),
unknown's avatar
unknown committed
514
			NullS)))
515
    DBUG_RETURN(1); /* purecov: inspected */
516 517 518
  if (!(rec_buff= (byte*) my_malloc((alloced_rec_buff_length=
				     table->rec_buff_length),
				    MYF(MY_WME))))
unknown's avatar
unknown committed
519
  {
520 521
    my_free(alloc_ptr,MYF(0)); /* purecov: inspected */
    DBUG_RETURN(1); /* purecov: inspected */
unknown's avatar
unknown committed
522 523
  }

524
  /* Init shared structure */
525
  if (!(share=get_share(name,table)))
unknown's avatar
unknown committed
526
  {
527
    my_free((char*) rec_buff,MYF(0)); /* purecov: inspected */
528 529
    my_free(alloc_ptr,MYF(0)); /* purecov: inspected */
    DBUG_RETURN(1); /* purecov: inspected */
unknown's avatar
unknown committed
530 531
  }
  thr_lock_data_init(&share->lock,&lock,(void*) 0);
532 533
  key_file = share->key_file;
  key_type = share->key_type;
534
  bzero((char*) &current_row,sizeof(current_row));
unknown's avatar
unknown committed
535

536 537 538 539
  /* Fill in shared structure, if needed */
  pthread_mutex_lock(&share->mutex);
  file = share->file;
  if (!share->use_count++)
unknown's avatar
unknown committed
540
  {
541 542
    if ((error=db_create(&file, db_env, 0)))
    {
543
      free_share(share,table, hidden_primary_key,1); /* purecov: inspected */
544
      my_free((char*) rec_buff,MYF(0)); /* purecov: inspected */
545 546 547
      my_free(alloc_ptr,MYF(0)); /* purecov: inspected */
      my_errno=error; /* purecov: inspected */
      DBUG_RETURN(1); /* purecov: inspected */
548 549 550 551 552 553 554 555
    }
    share->file = file;

    file->set_bt_compare(file,
			 (hidden_primary_key ? berkeley_cmp_hidden_key :
			  berkeley_cmp_packed_key));
    if (!hidden_primary_key)
      file->app_private= (void*) (table->key_info+table->primary_key);
unknown's avatar
unknown committed
556 557
    if ((error= txn_begin(db_env, 0, (DB_TXN**) &transaction, 0)) ||
	(error= (file->open(file, transaction,
558 559
			    fn_format(name_buff, name, "", ha_berkeley_ext,
				      2 | 4),
unknown's avatar
unknown committed
560 561
			    "main", DB_BTREE, open_mode, 0))) ||
	(error= transaction->commit(transaction, 0)))
562
    {
563
      free_share(share,table, hidden_primary_key,1); /* purecov: inspected */
564
      my_free((char*) rec_buff,MYF(0)); /* purecov: inspected */
565 566 567
      my_free(alloc_ptr,MYF(0)); /* purecov: inspected */
      my_errno=error; /* purecov: inspected */
      DBUG_RETURN(1); /* purecov: inspected */
568
    }
unknown's avatar
unknown committed
569

570
    /* Open other keys;  These are part of the share structure */
571 572 573 574 575 576 577 578 579 580 581
    key_file[primary_key]=file;
    key_type[primary_key]=DB_NOOVERWRITE;

    DB **ptr=key_file;
    for (uint i=0, used_keys=0; i < table->keys ; i++, ptr++)
    {
      char part[7];
      if (i != primary_key)
      {
	if ((error=db_create(ptr, db_env, 0)))
	{
unknown's avatar
unknown committed
582 583 584
	  close();				/* purecov: inspected */
	  my_errno=error;			/* purecov: inspected */
	  DBUG_RETURN(1);			/* purecov: inspected */
585 586 587 588 589 590
	}
	sprintf(part,"key%02d",++used_keys);
	key_type[i]=table->key_info[i].flags & HA_NOSAME ? DB_NOOVERWRITE : 0;
	(*ptr)->set_bt_compare(*ptr, berkeley_cmp_packed_key);
	(*ptr)->app_private= (void*) (table->key_info+i);
	if (!(table->key_info[i].flags & HA_NOSAME))
unknown's avatar
unknown committed
591 592
	{
	  DBUG_PRINT("bdb",("Setting DB_DUP for key %u", i));
593
	  (*ptr)->set_flags(*ptr, DB_DUP);
unknown's avatar
unknown committed
594
	}
unknown's avatar
unknown committed
595 596 597 598
	if ((error= txn_begin(db_env, 0, (DB_TXN**) &transaction, 0)) ||
	    (error=((*ptr)->open(*ptr, transaction, name_buff, part, DB_BTREE,
				 open_mode, 0))) ||
	    (error= transaction->commit(transaction, 0)))
599
	{
unknown's avatar
unknown committed
600 601 602
	  close();				/* purecov: inspected */
	  my_errno=error;			/* purecov: inspected */
	  DBUG_RETURN(1);			/* purecov: inspected */
603 604 605 606
	}
      }
    }
    /* Calculate pack_length of primary key */
607
    share->fixed_length_primary_key=1;
608 609 610 611 612
    if (!hidden_primary_key)
    {
      ref_length=0;
      KEY_PART_INFO *key_part= table->key_info[primary_key].key_part;
      KEY_PART_INFO *end=key_part+table->key_info[primary_key].key_parts;
613
      for (; key_part != end ; key_part++)
614
	ref_length+= key_part->field->max_packed_col_length(key_part->length);
615
      share->fixed_length_primary_key=
616 617 618
	(ref_length == table->key_info[primary_key].key_length);
      share->status|=STATUS_PRIMARY_KEY_INIT;
    }    
619
    share->ref_length=ref_length;
unknown's avatar
unknown committed
620
  }
621
  ref_length=share->ref_length;			// If second open
622
  pthread_mutex_unlock(&share->mutex);
unknown's avatar
unknown committed
623 624 625

  transaction=0;
  cursor=0;
626
  key_read=0;
627
  block_size=8192;				// Berkeley DB block size
628
  share->fixed_length_row=!(table->db_create_options & HA_OPTION_PACK_RECORD);
unknown's avatar
unknown committed
629

630
  get_status();
unknown's avatar
unknown committed
631
  info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST);
unknown's avatar
unknown committed
632 633 634 635 636 637 638 639
  DBUG_RETURN(0);
}


int ha_berkeley::close(void)
{
  DBUG_ENTER("ha_berkeley::close");

640
  my_free((char*) rec_buff,MYF(MY_ALLOW_ZERO_PTR));
unknown's avatar
unknown committed
641
  my_free(alloc_ptr,MYF(MY_ALLOW_ZERO_PTR));
unknown's avatar
unknown committed
642
  ha_berkeley::extra(HA_EXTRA_RESET);		// current_row buffer
unknown's avatar
unknown committed
643
  DBUG_RETURN(free_share(share,table, hidden_primary_key,0));
unknown's avatar
unknown committed
644 645 646 647 648 649 650 651 652 653 654 655
}


/* Reallocate buffer if needed */

bool ha_berkeley::fix_rec_buff_for_blob(ulong length)
{
  if (! rec_buff || length > alloced_rec_buff_length)
  {
    byte *newptr;
    if (!(newptr=(byte*) my_realloc((gptr) rec_buff, length,
				    MYF(MY_ALLOW_ZERO_PTR))))
656
      return 1; /* purecov: inspected */
unknown's avatar
unknown committed
657 658 659 660 661 662 663 664 665 666 667 668 669
    rec_buff=newptr;
    alloced_rec_buff_length=length;
  }
  return 0;
}


/* Calculate max length needed for row */

ulong ha_berkeley::max_row_length(const byte *buf)
{
  ulong length=table->reclength + table->fields*2;
  for (Field_blob **ptr=table->blob_field ; *ptr ; ptr++)
670
    length+= (*ptr)->get_length((char*) buf+(*ptr)->offset())+2;
unknown's avatar
unknown committed
671 672 673 674 675 676 677 678 679 680 681 682 683
  return length;
}


/*
  Pack a row for storage.  If the row is of fixed length, just store the
  row 'as is'.
  If not, we will generate a packed row suitable for storage.
  This will only fail if we don't have enough memory to pack the row, which;
  may only happen in rows with blobs,  as the default row length is
  pre-allocated.
*/

684
int ha_berkeley::pack_row(DBT *row, const byte *record, bool new_row)
unknown's avatar
unknown committed
685 686
{
  bzero((char*) row,sizeof(*row));
687
  if (share->fixed_length_row)
unknown's avatar
unknown committed
688 689
  {
    row->data=(void*) record;
690 691 692 693 694 695 696 697
    row->size=table->reclength+hidden_primary_key;
    if (hidden_primary_key)
    {
      if (new_row)
	get_auto_primary_key(current_ident);
      memcpy_fixed((char*) record+table->reclength, (char*) current_ident,
		   BDB_HIDDEN_PRIMARY_KEY_LENGTH);
    }
unknown's avatar
unknown committed
698 699 700 701 702
    return 0;
  }
  if (table->blob_fields)
  {
    if (fix_rec_buff_for_blob(max_row_length(record)))
703
      return HA_ERR_OUT_OF_MEM; /* purecov: inspected */
unknown's avatar
unknown committed
704 705 706 707 708 709 710
  }

  /* Copy null bits */
  memcpy(rec_buff, record, table->null_bytes);
  byte *ptr=rec_buff + table->null_bytes;

  for (Field **field=table->field ; *field ; field++)
711 712
    ptr=(byte*) (*field)->pack((char*) ptr,
			       (char*) record + (*field)->offset());
713 714 715 716 717 718 719 720 721

  if (hidden_primary_key)
  {
    if (new_row)
      get_auto_primary_key(current_ident);
    memcpy_fixed((char*) ptr, (char*) current_ident,
		 BDB_HIDDEN_PRIMARY_KEY_LENGTH);
    ptr+=BDB_HIDDEN_PRIMARY_KEY_LENGTH;
  }
unknown's avatar
unknown committed
722 723 724 725 726 727 728 729
  row->data=rec_buff;
  row->size= (size_t) (ptr - rec_buff);
  return 0;
}


void ha_berkeley::unpack_row(char *record, DBT *row)
{
730
  if (share->fixed_length_row)
731
    memcpy(record,(char*) row->data,table->reclength+hidden_primary_key);
unknown's avatar
unknown committed
732 733 734 735 736 737 738 739 740 741 742 743
  else
  {
    /* Copy null bits */
    const char *ptr= (const char*) row->data;
    memcpy(record, ptr, table->null_bytes);
    ptr+=table->null_bytes;
    for (Field **field=table->field ; *field ; field++)
      ptr= (*field)->unpack(record + (*field)->offset(), ptr);
  }
}


744 745 746 747 748 749
/* Store the key and the primary key into the row */

void ha_berkeley::unpack_key(char *record, DBT *key, uint index)
{
  KEY *key_info=table->key_info+index;
  KEY_PART_INFO *key_part= key_info->key_part,
unknown's avatar
unknown committed
750
		*end=key_part+key_info->key_parts;
751 752

  char *pos=(char*) key->data;
753
  for (; key_part != end; key_part++)
754 755 756 757 758 759 760 761 762 763 764 765 766 767 768
  {
    if (key_part->null_bit)
    {
      if (!*pos++)				// Null value
      {
	/*
	  We don't need to reset the record data as we will not access it
	  if the null data is set
	*/

	record[key_part->null_offset]|=key_part->null_bit;
	continue;
      }
      record[key_part->null_offset]&= ~key_part->null_bit;
    }
769 770
    pos= (char*) key_part->field->unpack_key(record + key_part->field->offset(),
                                             pos, key_part->length);
771 772 773 774
  }
}


unknown's avatar
unknown committed
775 776 777 778 779
/*
  Create a packed key from from a row
  This will never fail as the key buffer is pre allocated.
*/

780 781
DBT *ha_berkeley::create_key(DBT *key, uint keynr, char *buff,
			     const byte *record, int key_length)
unknown's avatar
unknown committed
782
{
783 784 785
  bzero((char*) key,sizeof(*key));
  if (hidden_primary_key && keynr == primary_key)
  {
786
    /* We don't need to set app_private here */
787 788 789 790 791
    key->data=current_ident;
    key->size=BDB_HIDDEN_PRIMARY_KEY_LENGTH;
    return key;
  }

unknown's avatar
unknown committed
792 793 794
  KEY *key_info=table->key_info+keynr;
  KEY_PART_INFO *key_part=key_info->key_part;
  KEY_PART_INFO *end=key_part+key_info->key_parts;
795
  DBUG_ENTER("create_key");
unknown's avatar
unknown committed
796 797

  key->data=buff;
798
  key->app_private= key_info;
799
  for (; key_part != end && key_length > 0; key_part++)
unknown's avatar
unknown committed
800 801 802 803 804 805 806 807 808 809 810 811
  {
    if (key_part->null_bit)
    {
      /* Store 0 if the key part is a NULL part */
      if (record[key_part->null_offset] & key_part->null_bit)
      {
	*buff++ =0;
	key->flags|=DB_DBT_DUPOK;
	continue;
      }
      *buff++ = 1;				// Store NOT NULL marker
    }
812
    buff=key_part->field->pack_key(buff,(char*) (record + key_part->offset),
813
				   key_part->length);
814
    key_length-=key_part->length;
unknown's avatar
unknown committed
815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831
  }
  key->size= (buff  - (char*) key->data);
  DBUG_DUMP("key",(char*) key->data, key->size);
  DBUG_RETURN(key);
}


/*
  Create a packed key from from a MySQL unpacked key
*/

DBT *ha_berkeley::pack_key(DBT *key, uint keynr, char *buff,
			   const byte *key_ptr, uint key_length)
{
  KEY *key_info=table->key_info+keynr;
  KEY_PART_INFO *key_part=key_info->key_part;
  KEY_PART_INFO *end=key_part+key_info->key_parts;
832
  DBUG_ENTER("bdb:pack_key");
unknown's avatar
unknown committed
833 834 835

  bzero((char*) key,sizeof(*key));
  key->data=buff;
836
  key->app_private= (void*) key_info;
unknown's avatar
unknown committed
837 838 839

  for (; key_part != end && (int) key_length > 0 ; key_part++)
  {
840
    uint offset=0;
unknown's avatar
unknown committed
841 842 843 844 845 846 847 848 849
    if (key_part->null_bit)
    {
      if (!(*buff++ = (*key_ptr == 0)))		// Store 0 if NULL
      {
	key_length-= key_part->store_length;
	key_ptr+=   key_part->store_length;
	key->flags|=DB_DBT_DUPOK;
	continue;
      }
850
      offset=1;					// Data is at key_ptr+1
unknown's avatar
unknown committed
851
    }
852
    buff=key_part->field->pack_key_from_key_image(buff,(char*) key_ptr+offset,
853
						  key_part->length);
unknown's avatar
unknown committed
854 855 856 857 858 859 860 861 862 863 864 865 866 867 868
    key_ptr+=key_part->store_length;
    key_length-=key_part->store_length;
  }
  key->size= (buff  - (char*) key->data);
  DBUG_DUMP("key",(char*) key->data, key->size);
  DBUG_RETURN(key);
}


int ha_berkeley::write_row(byte * record)
{
  DBT row,prim_key,key;
  int error;
  DBUG_ENTER("write_row");

869
  statistic_increment(table->in_use->status_var.ha_write_count, &LOCK_status);
870 871
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
    table->timestamp_field->set_time();
unknown's avatar
unknown committed
872 873
  if (table->next_number_field && record == table->record[0])
    update_auto_increment();
874
  if ((error=pack_row(&row, record,1)))
875
    DBUG_RETURN(error); /* purecov: inspected */
unknown's avatar
unknown committed
876

877
  table->insert_or_update= 1;                   // For handling of VARCHAR
878
  if (table->keys + test(hidden_primary_key) == 1)
unknown's avatar
unknown committed
879
  {
880 881
    error=file->put(file, transaction, create_key(&prim_key, primary_key,
						  key_buff, record),
unknown's avatar
unknown committed
882
		    &row, key_type[primary_key]);
unknown's avatar
unknown committed
883
    last_dup_key=primary_key;
unknown's avatar
unknown committed
884 885 886
  }
  else
  {
887
    DB_TXN *sub_trans = transaction;
888 889
    /* Don't use sub transactions in temporary tables */
    ulong thd_options = table->tmp_table == NO_TMP_TABLE ? table->in_use->options : 0;
unknown's avatar
unknown committed
890 891
    for (uint retry=0 ; retry < berkeley_trans_retry ; retry++)
    {
unknown's avatar
unknown committed
892
      key_map changed_keys(0);
893 894
      if (using_ignore && (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
      {
895 896 897
	if ((error=txn_begin(db_env, transaction, &sub_trans, 0))) /* purecov: deadcode */
	  break; /* purecov: deadcode */
	DBUG_PRINT("trans",("starting subtransaction")); /* purecov: deadcode */
898
      }
899 900
      if (!(error=file->put(file, sub_trans, create_key(&prim_key, primary_key,
							key_buff, record),
unknown's avatar
unknown committed
901 902
			    &row, key_type[primary_key])))
      {
903
	changed_keys.set_bit(primary_key);
904
	for (uint keynr=0 ; keynr < table->keys ; keynr++)
unknown's avatar
unknown committed
905 906 907 908
	{
	  if (keynr == primary_key)
	    continue;
	  if ((error=key_file[keynr]->put(key_file[keynr], sub_trans,
909 910
					  create_key(&key, keynr, key_buff2,
						     record),
unknown's avatar
unknown committed
911 912 913 914 915
					  &prim_key, key_type[keynr])))
	  {
	    last_dup_key=keynr;
	    break;
	  }
916
	  changed_keys.set_bit(keynr);
unknown's avatar
unknown committed
917 918
	}
      }
unknown's avatar
unknown committed
919 920
      else
	last_dup_key=primary_key;
921
      if (error)
unknown's avatar
unknown committed
922 923 924
      {
	/* Remove inserted row */
	DBUG_PRINT("error",("Got error %d",error));
925
	if (using_ignore)
unknown's avatar
unknown committed
926
	{
927 928 929
	  int new_error = 0;
	  if (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS)
	  {
930 931
	    DBUG_PRINT("trans",("aborting subtransaction")); /* purecov: deadcode */
	    new_error=txn_abort(sub_trans); /* purecov: deadcode */
932
	  }
unknown's avatar
unknown committed
933
	  else if (!changed_keys.is_clear_all())
934 935
	  {
	    new_error = 0;
unknown's avatar
unknown committed
936 937
	    for (uint keynr=0 ; keynr < table->keys+test(hidden_primary_key) ;
                 keynr++)
938
	    {
unknown's avatar
unknown committed
939
	      if (changed_keys.is_set(keynr))
940 941
	      {
		if ((new_error = remove_key(sub_trans, keynr, record,
942
					    &prim_key)))
943
		  break; /* purecov: inspected */
944 945 946 947 948
	      }
	    }
	  }
	  if (new_error)
	  {
949 950
	    error=new_error;			// This shouldn't happen /* purecov: inspected */
	    break; /* purecov: inspected */
951
	  }
unknown's avatar
unknown committed
952 953
	}
      }
954 955
      else if (using_ignore && (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
      {
956 957
	DBUG_PRINT("trans",("committing subtransaction")); /* purecov: deadcode */
	error=txn_commit(sub_trans, 0); /* purecov: deadcode */
958
      }
unknown's avatar
unknown committed
959 960 961 962
      if (error != DB_LOCK_DEADLOCK)
	break;
    }
  }
963
  table->insert_or_update= 0;
unknown's avatar
unknown committed
964 965
  if (error == DB_KEYEXIST)
    error=HA_ERR_FOUND_DUPP_KEY;
966 967
  else if (!error)
    changed_rows++;
unknown's avatar
unknown committed
968 969 970 971 972 973 974 975 976 977 978 979
  DBUG_RETURN(error);
}


/* Compare if a key in a row has changed */

int ha_berkeley::key_cmp(uint keynr, const byte * old_row,
			 const byte * new_row)
{
  KEY_PART_INFO *key_part=table->key_info[keynr].key_part;
  KEY_PART_INFO *end=key_part+table->key_info[keynr].key_parts;

980
  for (; key_part != end ; key_part++)
unknown's avatar
unknown committed
981 982 983 984 985 986 987
  {
    if (key_part->null_bit)
    {
      if ((old_row[key_part->null_offset] & key_part->null_bit) !=
	  (new_row[key_part->null_offset] & key_part->null_bit))
	return 1;
    }
988
    if (key_part->key_part_flag & (HA_BLOB_PART | HA_VAR_LENGTH_PART))
unknown's avatar
unknown committed
989
    {
unknown's avatar
unknown committed
990

991 992
      if (key_part->field->cmp_binary((char*) (old_row + key_part->offset),
				      (char*) (new_row + key_part->offset),
unknown's avatar
unknown committed
993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008
				      (ulong) key_part->length))
	return 1;
    }
    else
    {
      if (memcmp(old_row+key_part->offset, new_row+key_part->offset,
		 key_part->length))
	return 1;
    }
  }
  return 0;
}


/*
  Update a row from one value to another.
1009
  Clobbers key_buff2
unknown's avatar
unknown committed
1010 1011 1012
*/

int ha_berkeley::update_primary_key(DB_TXN *trans, bool primary_key_changed,
1013 1014 1015
				    const byte * old_row, DBT *old_key,
				    const byte * new_row, DBT *new_key,
				    ulong thd_options, bool local_using_ignore)
unknown's avatar
unknown committed
1016
{
1017
  DBT row;
1018
  int error;
unknown's avatar
unknown committed
1019 1020 1021 1022 1023 1024
  DBUG_ENTER("update_primary_key");

  if (primary_key_changed)
  {
    // Primary key changed or we are updating a key that can have duplicates.
    // Delete the old row and add a new one
1025
    if (!(error=remove_key(trans, primary_key, old_row, old_key)))
unknown's avatar
unknown committed
1026
    {
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
      if (!(error=pack_row(&row, new_row, 0)))
      {
	if ((error=file->put(file, trans, new_key, &row,
			     key_type[primary_key])))
	{
	  // Probably a duplicated key; restore old key and row if needed
	  last_dup_key=primary_key;
	  if (local_using_ignore &&
	      !(thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
	  {
	    int new_error;
	    if ((new_error=pack_row(&row, old_row, 0)) ||
		(new_error=file->put(file, trans, old_key, &row,
				     key_type[primary_key])))
1041
	      error=new_error;                  // fatal error /* purecov: inspected */
1042 1043 1044
	  }
	}
      }
unknown's avatar
unknown committed
1045 1046 1047 1048 1049
    }
  }
  else
  {
    // Primary key didn't change;  just update the row data
1050 1051
    if (!(error=pack_row(&row, new_row, 0)))
      error=file->put(file, trans, new_key, &row, 0);
unknown's avatar
unknown committed
1052
  }
1053
  DBUG_RETURN(error);
unknown's avatar
unknown committed
1054 1055
}

1056 1057 1058 1059 1060 1061
/*
  Restore changed keys, when a non-fatal error aborts the insert/update
  of one row.
  Clobbers keybuff2
*/

unknown's avatar
unknown committed
1062
int ha_berkeley::restore_keys(DB_TXN *trans, key_map *changed_keys,
1063 1064 1065 1066 1067 1068 1069
			      uint primary_key,
			      const byte *old_row, DBT *old_key,
			      const byte *new_row, DBT *new_key,
			      ulong thd_options)
{
  int error;
  DBT tmp_key;
1070
  uint keynr;
1071 1072 1073 1074 1075 1076
  DBUG_ENTER("restore_keys");

  /* Restore the old primary key, and the old row, but don't ignore
     duplicate key failure */
  if ((error=update_primary_key(trans, TRUE, new_row, new_key,
				old_row, old_key, thd_options, FALSE)))
1077
    goto err; /* purecov: inspected */
1078 1079 1080 1081 1082 1083

  /* Remove the new key, and put back the old key
     changed_keys is a map of all non-primary keys that need to be
     rolled back.  The last key set in changed_keys is the one that
     triggered the duplicate key error (it wasn't inserted), so for
     that one just put back the old value. */
unknown's avatar
unknown committed
1084
  if (!changed_keys->is_clear_all())
1085
  {
unknown's avatar
unknown committed
1086
    for (keynr=0 ; keynr < table->keys+test(hidden_primary_key) ; keynr++)
1087
    {
unknown's avatar
unknown committed
1088
      if (changed_keys->is_set(keynr))
unknown's avatar
unknown committed
1089
      {
unknown's avatar
unknown committed
1090
        if (changed_keys->is_prefix(1) &&
unknown's avatar
unknown committed
1091 1092 1093 1094 1095 1096 1097 1098
            (error = remove_key(trans, keynr, new_row, new_key)))
          break; /* purecov: inspected */
        if ((error = key_file[keynr]->put(key_file[keynr], trans,
                                          create_key(&tmp_key, keynr, key_buff2,
                                                     old_row),
                                          old_key, key_type[keynr])))
          break; /* purecov: inspected */
      }
1099 1100
    }
  }
unknown's avatar
unknown committed
1101

1102
err:
unknown's avatar
unknown committed
1103
  DBUG_ASSERT(error != DB_KEYEXIST);
1104 1105
  DBUG_RETURN(error);
}
unknown's avatar
unknown committed
1106 1107 1108 1109


int ha_berkeley::update_row(const byte * old_row, byte * new_row)
{
1110
  DBT prim_key, key, old_prim_key;
unknown's avatar
unknown committed
1111 1112
  int error;
  DB_TXN *sub_trans;
1113
  ulong thd_options = table->tmp_table == NO_TMP_TABLE ? table->in_use->options : 0;
unknown's avatar
unknown committed
1114
  bool primary_key_changed;
1115

unknown's avatar
unknown committed
1116
  DBUG_ENTER("update_row");
unknown's avatar
unknown committed
1117
  LINT_INIT(error);
unknown's avatar
unknown committed
1118

1119
  statistic_increment(table->in_use->status_var.ha_update_count,&LOCK_status);
1120 1121
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
    table->timestamp_field->set_time();
1122

1123
  table->insert_or_update= 1;                   // For handling of VARCHAR
1124 1125 1126 1127 1128 1129
  if (hidden_primary_key)
  {
    primary_key_changed=0;
    bzero((char*) &prim_key,sizeof(prim_key));
    prim_key.data= (void*) current_ident;
    prim_key.size=BDB_HIDDEN_PRIMARY_KEY_LENGTH;
unknown's avatar
unknown committed
1130
    old_prim_key=prim_key;
1131 1132 1133
  }
  else
  {
1134
    create_key(&prim_key, primary_key, key_buff, new_row);
unknown's avatar
unknown committed
1135

1136
    if ((primary_key_changed=key_cmp(primary_key, old_row, new_row)))
1137
      create_key(&old_prim_key, primary_key, primary_key_buff, old_row);
1138 1139 1140
    else
      old_prim_key=prim_key;
  }
unknown's avatar
unknown committed
1141

1142
  sub_trans = transaction;
unknown's avatar
unknown committed
1143 1144
  for (uint retry=0 ; retry < berkeley_trans_retry ; retry++)
  {
unknown's avatar
unknown committed
1145
    key_map changed_keys(0);
1146 1147
    if (using_ignore &&	(thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
    {
1148 1149 1150
      if ((error=txn_begin(db_env, transaction, &sub_trans, 0))) /* purecov: deadcode */
	break; /* purecov: deadcode */
      DBUG_PRINT("trans",("starting subtransaction")); /* purecov: deadcode */
1151
    }
unknown's avatar
unknown committed
1152 1153
    /* Start by updating the primary key */
    if (!(error=update_primary_key(sub_trans, primary_key_changed,
1154 1155 1156
				   old_row, &old_prim_key,
				   new_row, &prim_key,
				   thd_options, using_ignore)))
unknown's avatar
unknown committed
1157 1158 1159 1160 1161 1162 1163 1164
    {
      // Update all other keys
      for (uint keynr=0 ; keynr < table->keys ; keynr++)
      {
	if (keynr == primary_key)
	  continue;
	if (key_cmp(keynr, old_row, new_row) || primary_key_changed)
	{
1165
	  if ((error=remove_key(sub_trans, keynr, old_row, &old_prim_key)))
1166
	  {
1167
	    if (using_ignore && /* purecov: inspected */
1168 1169 1170 1171 1172 1173 1174 1175
		(thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
            {
	      int new_error;
	      DBUG_PRINT("trans",("aborting subtransaction"));
	      new_error=txn_abort(sub_trans);
	      if (new_error)
		error = new_error;
	    }
1176
            table->insert_or_update= 0;
1177
	    DBUG_RETURN(error);			// Fatal error /* purecov: inspected */
1178
	  }
1179
	  changed_keys.set_bit(keynr);
1180
	  if ((error=key_file[keynr]->put(key_file[keynr], sub_trans,
1181
					  create_key(&key, keynr, key_buff2,
1182
						     new_row),
unknown's avatar
unknown committed
1183 1184 1185 1186 1187 1188 1189 1190
					  &prim_key, key_type[keynr])))
	  {
	    last_dup_key=keynr;
	    break;
	  }
	}
      }
    }
1191
    if (error)
unknown's avatar
unknown committed
1192 1193
    {
      /* Remove inserted row */
1194 1195 1196 1197 1198 1199
      DBUG_PRINT("error",("Got error %d",error));
      if (using_ignore)
      {
	int new_error = 0;
	if (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS)
	{
1200 1201
	  DBUG_PRINT("trans",("aborting subtransaction")); /* purecov: deadcode */
	  new_error=txn_abort(sub_trans); /* purecov: deadcode */
1202
	}
unknown's avatar
unknown committed
1203
	else if (!changed_keys.is_clear_all())
unknown's avatar
unknown committed
1204
	  new_error=restore_keys(transaction, &changed_keys, primary_key,
1205 1206 1207 1208
				 old_row, &old_prim_key, new_row, &prim_key,
				 thd_options);
	if (new_error)
	{
1209 1210 1211
          /* This shouldn't happen */
	  error=new_error;			/* purecov: inspected */
	  break;                                /* purecov: inspected */
1212 1213 1214 1215 1216
	}
      }
    }
    else if (using_ignore && (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
    {
1217 1218
      DBUG_PRINT("trans",("committing subtransaction")); /* purecov: deadcode */
      error=txn_commit(sub_trans, 0); /* purecov: deadcode */
unknown's avatar
unknown committed
1219 1220 1221 1222
    }
    if (error != DB_LOCK_DEADLOCK)
      break;
  }
1223
  table->insert_or_update= 0;
unknown's avatar
unknown committed
1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235
  if (error == DB_KEYEXIST)
    error=HA_ERR_FOUND_DUPP_KEY;
  DBUG_RETURN(error);
}


/*
  Delete one key
  This uses key_buff2, when keynr != primary key, so it's important that
  a function that calls this doesn't use this buffer for anything else.
*/

1236
int ha_berkeley::remove_key(DB_TXN *trans, uint keynr, const byte *record,
unknown's avatar
unknown committed
1237 1238 1239 1240 1241 1242 1243
			    DBT *prim_key)
{
  int error;
  DBT key;
  DBUG_ENTER("remove_key");
  DBUG_PRINT("enter",("index: %d",keynr));

unknown's avatar
unknown committed
1244 1245 1246 1247 1248
  if (keynr == active_index && cursor)
    error=cursor->c_del(cursor,0);
  else if (keynr == primary_key ||
	   ((table->key_info[keynr].flags & (HA_NOSAME | HA_NULL_PART_KEY)) ==
	    HA_NOSAME))
unknown's avatar
unknown committed
1249
  {						// Unique key
unknown's avatar
unknown committed
1250
    DBUG_ASSERT(keynr == primary_key || prim_key->data != key_buff2);
1251
    error=key_file[keynr]->del(key_file[keynr], trans,
unknown's avatar
unknown committed
1252 1253
			       keynr == primary_key ?
			       prim_key :
1254
			       create_key(&key, keynr, key_buff2, record),
unknown's avatar
unknown committed
1255 1256 1257 1258 1259 1260 1261 1262 1263
			       0);
  }
  else
  {
    /*
      To delete the not duplicated key, we need to open an cursor on the
      row to find the key to be delete and delete it.
      We will never come here with keynr = primary_key
    */
unknown's avatar
unknown committed
1264
    DBUG_ASSERT(keynr != primary_key && prim_key->data != key_buff2);
unknown's avatar
unknown committed
1265
    DBC *tmp_cursor;
1266 1267
    if (!(error=key_file[keynr]->cursor(key_file[keynr], trans,
					&tmp_cursor, 0)))
unknown's avatar
unknown committed
1268
    {
1269
      if (!(error=tmp_cursor->c_get(tmp_cursor,
1270 1271
                                    create_key(&key, keynr, key_buff2, record),
                                    prim_key, DB_GET_BOTH | DB_RMW)))
unknown's avatar
unknown committed
1272
      {					// This shouldn't happen
unknown's avatar
unknown committed
1273
	error=tmp_cursor->c_del(tmp_cursor,0);
unknown's avatar
unknown committed
1274
      }
unknown's avatar
unknown committed
1275
      int result=tmp_cursor->c_close(tmp_cursor);
unknown's avatar
unknown committed
1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286
      if (!error)
	error=result;
    }
  }
  DBUG_RETURN(error);
}


/* Delete all keys for new_record */

int ha_berkeley::remove_keys(DB_TXN *trans, const byte *record,
unknown's avatar
unknown committed
1287
			     DBT *new_record, DBT *prim_key, key_map *keys)
unknown's avatar
unknown committed
1288
{
1289
  int result = 0;
unknown's avatar
unknown committed
1290
  for (uint keynr=0 ; keynr < table->keys+test(hidden_primary_key) ; keynr++)
unknown's avatar
unknown committed
1291
  {
unknown's avatar
unknown committed
1292
    if (keys->is_set(keynr))
unknown's avatar
unknown committed
1293
    {
1294
      int new_error=remove_key(trans, keynr, record, prim_key);
unknown's avatar
unknown committed
1295 1296
      if (new_error)
      {
1297 1298
	result=new_error;			// Return last error /* purecov: inspected */
	break;					// Let rollback correct things /* purecov: inspected */
unknown's avatar
unknown committed
1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310
      }
    }
  }
  return result;
}


int ha_berkeley::delete_row(const byte * record)
{
  int error;
  DBT row, prim_key;
  key_map keys=table->keys_in_use;
1311
  ulong thd_options = table->tmp_table == NO_TMP_TABLE ? table->in_use->options : 0;
unknown's avatar
unknown committed
1312
  DBUG_ENTER("delete_row");
1313
  statistic_increment(table->in_use->status_var.ha_delete_count,&LOCK_status);
unknown's avatar
unknown committed
1314

1315
  if ((error=pack_row(&row, record, 0)))
1316
    DBUG_RETURN((error)); /* purecov: inspected */
1317
  create_key(&prim_key, primary_key, key_buff, record);
1318
  if (hidden_primary_key)
1319
    keys.set_bit(primary_key);
1320

1321 1322 1323
  /* Subtransactions may be used in order to retry the delete in
     case we get a DB_LOCK_DEADLOCK error. */
  DB_TXN *sub_trans = transaction;
unknown's avatar
unknown committed
1324 1325
  for (uint retry=0 ; retry < berkeley_trans_retry ; retry++)
  {
1326 1327
    if (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS)
    {
1328 1329 1330
      if ((error=txn_begin(db_env, transaction, &sub_trans, 0))) /* purecov: deadcode */
	break; /* purecov: deadcode */
      DBUG_PRINT("trans",("starting sub transaction")); /* purecov: deadcode */
1331
    }
unknown's avatar
unknown committed
1332
    error=remove_keys(sub_trans, record, &row, &prim_key, &keys);
1333
    if (!error && (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
unknown's avatar
unknown committed
1334
    {
1335 1336
      DBUG_PRINT("trans",("ending sub transaction")); /* purecov: deadcode */
      error=txn_commit(sub_trans, 0); /* purecov: deadcode */
unknown's avatar
unknown committed
1337 1338
    }
    if (error)
1339
    { /* purecov: inspected */
unknown's avatar
unknown committed
1340
      DBUG_PRINT("error",("Got error %d",error));
1341
      if (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS)
unknown's avatar
unknown committed
1342
      {
1343 1344 1345 1346 1347 1348 1349 1350
	/* retry */
	int new_error;
	DBUG_PRINT("trans",("aborting subtransaction"));
	if ((new_error=txn_abort(sub_trans)))
	{
	  error=new_error;			// This shouldn't happen
	  break;
	}
unknown's avatar
unknown committed
1351
      }
1352 1353
      else
	break;					// No retry - return error
unknown's avatar
unknown committed
1354 1355 1356 1357
    }
    if (error != DB_LOCK_DEADLOCK)
      break;
  }
unknown's avatar
unknown committed
1358
#ifdef CANT_COUNT_DELETED_ROWS
1359 1360
  if (!error)
    changed_rows--;
unknown's avatar
unknown committed
1361
#endif
1362
  DBUG_RETURN(error);
unknown's avatar
unknown committed
1363 1364 1365 1366 1367 1368
}


int ha_berkeley::index_init(uint keynr)
{
  int error;
1369
  DBUG_ENTER("ha_berkeley::index_init");
1370
  DBUG_PRINT("enter",("table: '%s'  key: %d", table->real_name, keynr));
unknown's avatar
unknown committed
1371 1372 1373 1374 1375 1376

  /*
    Under some very rare conditions (like full joins) we may already have
    an active cursor at this point
  */
  if (cursor)
unknown's avatar
unknown committed
1377 1378
  {
    DBUG_PRINT("note",("Closing active cursor"));
unknown's avatar
unknown committed
1379
    cursor->c_close(cursor);
unknown's avatar
unknown committed
1380
  }
unknown's avatar
unknown committed
1381
  active_index=keynr;
1382 1383 1384 1385
  if ((error=key_file[keynr]->cursor(key_file[keynr], transaction, &cursor,
				     table->reginfo.lock_type >
				     TL_WRITE_ALLOW_READ ?
				     0 : 0)))
unknown's avatar
unknown committed
1386
    cursor=0;				// Safety /* purecov: inspected */
unknown's avatar
unknown committed
1387 1388 1389 1390 1391 1392 1393
  bzero((char*) &last_key,sizeof(last_key));
  DBUG_RETURN(error);
}

int ha_berkeley::index_end()
{
  int error=0;
1394
  DBUG_ENTER("ha_berkely::index_end");
unknown's avatar
unknown committed
1395 1396
  if (cursor)
  {
1397
    DBUG_PRINT("enter",("table: '%s'", table->real_name));
unknown's avatar
unknown committed
1398 1399 1400
    error=cursor->c_close(cursor);
    cursor=0;
  }
unknown's avatar
unknown committed
1401
  active_index=MAX_KEY;
unknown's avatar
unknown committed
1402 1403 1404 1405 1406 1407 1408
  DBUG_RETURN(error);
}


/* What to do after we have read a row based on an index */

int ha_berkeley::read_row(int error, char *buf, uint keynr, DBT *row,
1409
			  DBT *found_key, bool read_next)
unknown's avatar
unknown committed
1410
{
1411
  DBUG_ENTER("ha_berkeley::read_row");
unknown's avatar
unknown committed
1412 1413 1414 1415 1416 1417 1418
  if (error)
  {
    if (error == DB_NOTFOUND || error == DB_KEYEMPTY)
      error=read_next ? HA_ERR_END_OF_FILE : HA_ERR_KEY_NOT_FOUND;
    table->status=STATUS_NOT_FOUND;
    DBUG_RETURN(error);
  }
1419 1420 1421 1422 1423
  if (hidden_primary_key)
    memcpy_fixed(current_ident,
		 (char*) row->data+row->size-BDB_HIDDEN_PRIMARY_KEY_LENGTH,
		 BDB_HIDDEN_PRIMARY_KEY_LENGTH);
  table->status=0;
unknown's avatar
unknown committed
1424 1425
  if (keynr != primary_key)
  {
1426 1427 1428 1429 1430 1431 1432 1433 1434
    /* We only found the primary key.  Now we have to use this to find
       the row data */
    if (key_read && found_key)
    {
      unpack_key(buf,found_key,keynr);
      if (!hidden_primary_key)
	unpack_key(buf,row,primary_key);
      DBUG_RETURN(0);
    }
unknown's avatar
unknown committed
1435 1436
    DBT key;
    bzero((char*) &key,sizeof(key));
1437
    key.data=key_buff;
unknown's avatar
unknown committed
1438
    key.size=row->size;
1439
    key.app_private= (void*) (table->key_info+primary_key);
1440
    memcpy(key_buff,row->data,row->size);
unknown's avatar
unknown committed
1441 1442 1443 1444
    /* Read the data into current_row */
    current_row.flags=DB_DBT_REALLOC;
    if ((error=file->get(file, transaction, &key, &current_row, 0)))
    {
1445 1446
      table->status=STATUS_NOT_FOUND; /* purecov: inspected */
      DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error); /* purecov: inspected */
unknown's avatar
unknown committed
1447 1448 1449 1450 1451 1452 1453 1454
    }
    row= &current_row;
  }
  unpack_row(buf,row);
  DBUG_RETURN(0);
}


1455 1456
/* This is only used to read whole keys */

unknown's avatar
unknown committed
1457 1458 1459
int ha_berkeley::index_read_idx(byte * buf, uint keynr, const byte * key,
				uint key_len, enum ha_rkey_function find_flag)
{
1460
  statistic_increment(table->in_use->status_var.ha_read_key_count,&LOCK_status);
unknown's avatar
unknown committed
1461 1462
  DBUG_ENTER("index_read_idx");
  current_row.flags=DB_DBT_REALLOC;
unknown's avatar
unknown committed
1463
  active_index=MAX_KEY;
1464
  DBUG_RETURN(read_row(key_file[keynr]->get(key_file[keynr], transaction,
unknown's avatar
unknown committed
1465 1466 1467
				 pack_key(&last_key, keynr, key_buff, key,
					  key_len),
				 &current_row,0),
1468
		       (char*) buf, keynr, &current_row, &last_key, 0));
unknown's avatar
unknown committed
1469 1470 1471 1472 1473 1474 1475
}


int ha_berkeley::index_read(byte * buf, const byte * key,
			    uint key_len, enum ha_rkey_function find_flag)
{
  DBT row;
1476
  int error;
1477
  KEY *key_info= &table->key_info[active_index];
1478
  int do_prev= 0;
1479

1480
  DBUG_ENTER("ha_berkeley::index_read");
1481

1482
  statistic_increment(table->in_use->status_var.ha_read_key_count,&LOCK_status);
unknown's avatar
unknown committed
1483
  bzero((char*) &row,sizeof(row));
1484 1485 1486 1487 1488 1489 1490 1491 1492 1493
  if (find_flag == HA_READ_BEFORE_KEY)
  {
    find_flag= HA_READ_KEY_OR_NEXT;
    do_prev= 1;
  }
  else if (find_flag == HA_READ_PREFIX_LAST_OR_PREV)
  {
    find_flag= HA_READ_AFTER_KEY;
    do_prev= 1;
  }
1494 1495
  if (key_len == key_info->key_length &&
      !table->key_info[active_index].flags & HA_END_SPACE_KEY)
1496
  {
1497 1498
    if (find_flag == HA_READ_AFTER_KEY)
      key_info->handler.bdb_return_if_eq= 1;
1499 1500 1501 1502
    error=read_row(cursor->c_get(cursor, pack_key(&last_key,
						  active_index,
						  key_buff,
						  key, key_len),
1503 1504 1505
				 &row,
				 (find_flag == HA_READ_KEY_EXACT ?
				  DB_SET : DB_SET_RANGE)),
1506
		   (char*) buf, active_index, &row, (DBT*) 0, 0);
1507
    key_info->handler.bdb_return_if_eq= 0;
1508 1509 1510 1511 1512 1513
  }
  else
  {
    /* read of partial key */
    pack_key(&last_key, active_index, key_buff, key, key_len);
    /* Store for compare */
1514
    memcpy(key_buff2, key_buff, (key_len=last_key.size));
1515 1516 1517 1518 1519 1520
    /*
      If HA_READ_AFTER_KEY is set, return next key, else return first
      matching key.
    */
    key_info->handler.bdb_return_if_eq= (find_flag == HA_READ_AFTER_KEY ?
					 1 : -1);
1521
    error=read_row(cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE),
1522
		   (char*) buf, active_index, &row, (DBT*) 0, 0);
1523
    key_info->handler.bdb_return_if_eq= 0;
1524 1525
    if (!error && find_flag == HA_READ_KEY_EXACT)
    {
1526 1527
      /* Ensure that we found a key that is equal to the current one */
      if (!error && berkeley_key_cmp(table, key_info, key_buff2, key_len))
1528 1529 1530
	error=HA_ERR_KEY_NOT_FOUND;
    }
  }
1531 1532 1533 1534 1535
  if (do_prev)
  {
    bzero((char*) &row, sizeof(row));
    error= read_row(cursor->c_get(cursor, &last_key, &row, DB_PREV),
                         (char*) buf, active_index, &row, &last_key, 1);
unknown's avatar
unknown committed
1536
  }
1537
  DBUG_RETURN(error);
unknown's avatar
unknown committed
1538 1539
}

1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551
/*
  Read last key is solved by reading the next key and then reading
  the previous key
*/

int ha_berkeley::index_read_last(byte * buf, const byte * key, uint key_len)
{
  DBT row;
  int error;
  KEY *key_info= &table->key_info[active_index];
  DBUG_ENTER("ha_berkeley::index_read");

1552 1553
  statistic_increment(table->in_use->status_var.ha_read_key_count,
		      &LOCK_status);
1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571
  bzero((char*) &row,sizeof(row));

  /* read of partial key */
  pack_key(&last_key, active_index, key_buff, key, key_len);
  /* Store for compare */
  memcpy(key_buff2, key_buff, (key_len=last_key.size));
  key_info->handler.bdb_return_if_eq= 1;
  error=read_row(cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE),
		 (char*) buf, active_index, &row, (DBT*) 0, 0);
  key_info->handler.bdb_return_if_eq= 0;
  bzero((char*) &row,sizeof(row));
  if (read_row(cursor->c_get(cursor, &last_key, &row, DB_PREV),
	       (char*) buf, active_index, &row, &last_key, 1) ||
      berkeley_key_cmp(table, key_info, key_buff2, key_len))
    error=HA_ERR_KEY_NOT_FOUND;
  DBUG_RETURN(error);
}

unknown's avatar
unknown committed
1572 1573 1574 1575 1576

int ha_berkeley::index_next(byte * buf)
{
  DBT row;
  DBUG_ENTER("index_next");
1577 1578
  statistic_increment(table->in_use->status_var.ha_read_next_count,
		      &LOCK_status);
unknown's avatar
unknown committed
1579 1580
  bzero((char*) &row,sizeof(row));
  DBUG_RETURN(read_row(cursor->c_get(cursor, &last_key, &row, DB_NEXT),
1581
		       (char*) buf, active_index, &row, &last_key, 1));
unknown's avatar
unknown committed
1582 1583 1584 1585 1586 1587 1588
}

int ha_berkeley::index_next_same(byte * buf, const byte *key, uint keylen)
{
  DBT row;
  int error;
  DBUG_ENTER("index_next_same");
1589 1590
  statistic_increment(table->in_use->status_var.ha_read_next_count,
		      &LOCK_status);
unknown's avatar
unknown committed
1591
  bzero((char*) &row,sizeof(row));
1592 1593
  if (keylen == table->key_info[active_index].key_length &&
      !table->key_info[active_index].flags & HA_END_SPACE_KEY)
unknown's avatar
unknown committed
1594
    error=read_row(cursor->c_get(cursor, &last_key, &row, DB_NEXT_DUP),
1595
		   (char*) buf, active_index, &row, &last_key, 1);
unknown's avatar
unknown committed
1596 1597 1598
  else
  {
    error=read_row(cursor->c_get(cursor, &last_key, &row, DB_NEXT),
1599
		   (char*) buf, active_index, &row, &last_key, 1);
unknown's avatar
unknown committed
1600
    if (!error && ::key_cmp_if_same(table, key, active_index, keylen))
unknown's avatar
unknown committed
1601 1602 1603 1604 1605 1606 1607 1608 1609 1610
      error=HA_ERR_END_OF_FILE;
  }
  DBUG_RETURN(error);
}


int ha_berkeley::index_prev(byte * buf)
{
  DBT row;
  DBUG_ENTER("index_prev");
1611 1612
  statistic_increment(table->in_use->status_var.ha_read_prev_count,
		      &LOCK_status);
unknown's avatar
unknown committed
1613 1614
  bzero((char*) &row,sizeof(row));
  DBUG_RETURN(read_row(cursor->c_get(cursor, &last_key, &row, DB_PREV),
1615
		       (char*) buf, active_index, &row, &last_key, 1));
unknown's avatar
unknown committed
1616
}
unknown's avatar
unknown committed
1617

unknown's avatar
unknown committed
1618 1619 1620 1621 1622

int ha_berkeley::index_first(byte * buf)
{
  DBT row;
  DBUG_ENTER("index_first");
1623 1624
  statistic_increment(table->in_use->status_var.ha_read_first_count,
		      &LOCK_status);
unknown's avatar
unknown committed
1625 1626
  bzero((char*) &row,sizeof(row));
  DBUG_RETURN(read_row(cursor->c_get(cursor, &last_key, &row, DB_FIRST),
1627
		       (char*) buf, active_index, &row, &last_key, 1));
unknown's avatar
unknown committed
1628 1629 1630 1631 1632 1633
}

int ha_berkeley::index_last(byte * buf)
{
  DBT row;
  DBUG_ENTER("index_last");
1634 1635
  statistic_increment(table->in_use->status_var.ha_read_last_count,
		      &LOCK_status);
unknown's avatar
unknown committed
1636 1637
  bzero((char*) &row,sizeof(row));
  DBUG_RETURN(read_row(cursor->c_get(cursor, &last_key, &row, DB_LAST),
1638
		       (char*) buf, active_index, &row, &last_key, 0));
unknown's avatar
unknown committed
1639 1640 1641 1642
}

int ha_berkeley::rnd_init(bool scan)
{
1643
  DBUG_ENTER("rnd_init");
unknown's avatar
unknown committed
1644
  current_row.flags=DB_DBT_REALLOC;
1645
  DBUG_RETURN(index_init(primary_key));
unknown's avatar
unknown committed
1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656
}

int ha_berkeley::rnd_end()
{
  return index_end();
}

int ha_berkeley::rnd_next(byte *buf)
{
  DBT row;
  DBUG_ENTER("rnd_next");
1657 1658
  statistic_increment(table->in_use->status_var.ha_read_rnd_next_count,
		      &LOCK_status);
unknown's avatar
unknown committed
1659 1660
  bzero((char*) &row,sizeof(row));
  DBUG_RETURN(read_row(cursor->c_get(cursor, &last_key, &row, DB_NEXT),
1661
		       (char*) buf, primary_key, &row, &last_key, 1));
unknown's avatar
unknown committed
1662 1663 1664 1665 1666
}


DBT *ha_berkeley::get_pos(DBT *to, byte *pos)
{
1667
  /* We don't need to set app_private here */
unknown's avatar
unknown committed
1668 1669 1670
  bzero((char*) to,sizeof(*to));

  to->data=pos;
1671
  if (share->fixed_length_primary_key)
unknown's avatar
unknown committed
1672 1673 1674 1675 1676 1677
    to->size=ref_length;
  else
  {
    KEY_PART_INFO *key_part=table->key_info[primary_key].key_part;
    KEY_PART_INFO *end=key_part+table->key_info[primary_key].key_parts;

1678
    for (; key_part != end ; key_part++)
1679
      pos+=key_part->field->packed_col_length((char*) pos,key_part->length);
unknown's avatar
unknown committed
1680 1681 1682 1683 1684 1685 1686 1687 1688
    to->size= (uint) (pos- (byte*) to->data);
  }
  return to;
}


int ha_berkeley::rnd_pos(byte * buf, byte *pos)
{
  DBT db_pos;
1689
  
1690
  DBUG_ENTER("ha_berkeley::rnd_pos");
1691 1692
  statistic_increment(table->in_use->status_var.ha_read_rnd_count,
		      &LOCK_status);
unknown's avatar
unknown committed
1693
  active_index= MAX_KEY;
1694 1695 1696 1697
  DBUG_RETURN(read_row(file->get(file, transaction,
				 get_pos(&db_pos, pos),
				 &current_row, 0),
		       (char*) buf, primary_key, &current_row, (DBT*) 0, 0));
unknown's avatar
unknown committed
1698 1699
}

1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721
/*
  Set a reference to the current record in (ref,ref_length).

  SYNOPSIS
    ha_berkeley::position()
    record                      The current record buffer

  DESCRIPTION
    The BDB handler stores the primary key in (ref,ref_length).
    There is either an explicit primary key, or an implicit (hidden)
    primary key.
    During open(), 'ref_length' is calculated as the maximum primary
    key length. When an actual key is shorter than that, the rest of
    the buffer must be cleared out. The row cannot be identified, if
    garbage follows behind the end of the key. There is no length
    field for the current key, so that the whole ref_length is used
    for comparison.

  RETURN
    nothing
*/

unknown's avatar
unknown committed
1722 1723 1724
void ha_berkeley::position(const byte *record)
{
  DBT key;
1725
  DBUG_ENTER("ha_berkeley::position");
1726
  if (hidden_primary_key)
1727 1728
  {
    DBUG_ASSERT(ref_length == BDB_HIDDEN_PRIMARY_KEY_LENGTH);
1729
    memcpy_fixed(ref, (char*) current_ident, BDB_HIDDEN_PRIMARY_KEY_LENGTH);
1730
  }
1731
  else
1732
  {
1733
    create_key(&key, primary_key, (char*) ref, record);
1734 1735 1736 1737
    if (key.size < ref_length)
      bzero(ref + key.size, ref_length - key.size);
  }
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
1738 1739 1740 1741 1742
}


void ha_berkeley::info(uint flag)
{
1743
  DBUG_ENTER("ha_berkeley::info");
unknown's avatar
unknown committed
1744 1745
  if (flag & HA_STATUS_VARIABLE)
  {
1746
    records = share->rows + changed_rows; // Just to get optimisations right
unknown's avatar
unknown committed
1747 1748
    deleted = 0;
  }
1749 1750 1751 1752 1753 1754 1755 1756 1757
  if ((flag & HA_STATUS_CONST) || version != share->version)
  {
    version=share->version;
    for (uint i=0 ; i < table->keys ; i++)
    {
      table->key_info[i].rec_per_key[table->key_info[i].key_parts-1]=
	share->rec_per_key[i];
    }
  }
unknown's avatar
unknown committed
1758 1759 1760
  /* Don't return key if we got an error for the internal primary key */
  if (flag & HA_STATUS_ERRKEY && last_dup_key < table->keys)
    errkey= last_dup_key;
unknown's avatar
unknown committed
1761 1762 1763 1764 1765 1766
  DBUG_VOID_RETURN;
}


int ha_berkeley::extra(enum ha_extra_function operation)
{
1767 1768 1769 1770
  switch (operation) {
  case HA_EXTRA_RESET:
  case HA_EXTRA_RESET_STATE:
    key_read=0;
1771
    using_ignore=0;
unknown's avatar
unknown committed
1772 1773 1774 1775 1776 1777 1778 1779 1780
    if (current_row.flags & (DB_DBT_MALLOC | DB_DBT_REALLOC))
    {
      current_row.flags=0;
      if (current_row.data)
      {
	free(current_row.data);
	current_row.data=0;
      }
    }
1781 1782 1783 1784 1785 1786 1787
    break;
  case HA_EXTRA_KEYREAD:
    key_read=1;					// Query satisfied with key
    break;
  case HA_EXTRA_NO_KEYREAD:
    key_read=0;
    break;
1788 1789 1790 1791 1792 1793
  case HA_EXTRA_IGNORE_DUP_KEY:
    using_ignore=1;
    break;
  case HA_EXTRA_NO_IGNORE_DUP_KEY:
    using_ignore=0;
    break;
1794 1795 1796
  default:
    break;
  }
unknown's avatar
unknown committed
1797 1798 1799
  return 0;
}

1800

unknown's avatar
unknown committed
1801 1802
int ha_berkeley::reset(void)
{
1803
  ha_berkeley::extra(HA_EXTRA_RESET);
1804
  key_read=0;					// Reset to state after open
unknown's avatar
unknown committed
1805 1806 1807 1808 1809 1810 1811
  return 0;
}


/*
  As MySQL will execute an external lock for every new table it uses
  we can use this to start the transactions.
1812 1813 1814 1815
  If we are in auto_commit mode we just need to start a transaction
  for the statement to be able to rollback the statement.
  If not, we have to start a master transaction if there doesn't exist
  one from before.
unknown's avatar
unknown committed
1816 1817 1818 1819 1820 1821 1822 1823
*/

int ha_berkeley::external_lock(THD *thd, int lock_type)
{
  int error=0;
  DBUG_ENTER("ha_berkeley::external_lock");
  if (lock_type != F_UNLCK)
  {
1824
    if (!thd->transaction.bdb_lock_count++)
unknown's avatar
unknown committed
1825
    {
1826
      DBUG_ASSERT(thd->transaction.stmt.bdb_tid == 0);
unknown's avatar
unknown committed
1827
      transaction=0;				// Safety
1828
      /* First table lock, start transaction */
unknown's avatar
unknown committed
1829
      if ((thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN |
unknown's avatar
unknown committed
1830
			   OPTION_TABLE_LOCK)) &&
1831 1832 1833
	  !thd->transaction.all.bdb_tid)
      {
	/* We have to start a master transaction */
1834 1835
	DBUG_PRINT("trans",("starting transaction all:  options: 0x%lx",
                            (ulong) thd->options));
1836 1837 1838 1839
	if ((error=txn_begin(db_env, 0,
			     (DB_TXN**) &thd->transaction.all.bdb_tid,
			     0)))
	{
1840 1841
	  thd->transaction.bdb_lock_count--;	// We didn't get the lock /* purecov: inspected */
	  DBUG_RETURN(error); /* purecov: inspected */
1842
	}
unknown's avatar
unknown committed
1843 1844
	if (thd->in_lock_tables)
	  DBUG_RETURN(0);			// Don't create stmt trans
1845
      }
unknown's avatar
unknown committed
1846
      DBUG_PRINT("trans",("starting transaction stmt"));
1847 1848 1849
      if ((error=txn_begin(db_env,
			   (DB_TXN*) thd->transaction.all.bdb_tid,
			   (DB_TXN**) &thd->transaction.stmt.bdb_tid,
unknown's avatar
unknown committed
1850
			   0)))
1851 1852
      {
	/* We leave the possible master transaction open */
1853 1854
	thd->transaction.bdb_lock_count--;	// We didn't get the lock /* purecov: inspected */
	DBUG_RETURN(error); /* purecov: inspected */
1855
      }
unknown's avatar
unknown committed
1856
    }
1857
    transaction= (DB_TXN*) thd->transaction.stmt.bdb_tid;
unknown's avatar
unknown committed
1858 1859 1860 1861
  }
  else
  {
    lock.type=TL_UNLOCK;			// Unlocked
1862
    thread_safe_add(share->rows, changed_rows, &share->mutex);
1863
    changed_rows=0;
unknown's avatar
unknown committed
1864 1865
    if (!--thd->transaction.bdb_lock_count)
    {
1866
      if (thd->transaction.stmt.bdb_tid)
unknown's avatar
unknown committed
1867
      {
unknown's avatar
unknown committed
1868
	/*
1869
	   F_UNLOCK is done without a transaction commit / rollback.
1870 1871
	   This happens if the thread didn't update any rows
	   We must in this case commit the work to keep the row locks
unknown's avatar
unknown committed
1872
	*/
1873 1874 1875
	DBUG_PRINT("trans",("commiting non-updating transaction"));
	error=txn_commit((DB_TXN*) thd->transaction.stmt.bdb_tid,0);
	thd->transaction.stmt.bdb_tid=0;
unknown's avatar
unknown committed
1876
	transaction=0;
unknown's avatar
unknown committed
1877 1878 1879 1880
      }
    }
  }
  DBUG_RETURN(error);
unknown's avatar
unknown committed
1881
}
unknown's avatar
unknown committed
1882

unknown's avatar
unknown committed
1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895

/*
  When using LOCK TABLE's external_lock is only called when the actual
  TABLE LOCK is done.
  Under LOCK TABLES, each used tables will force a call to start_stmt.
*/

int ha_berkeley::start_stmt(THD *thd)
{
  int error=0;
  DBUG_ENTER("ha_berkeley::start_stmt");
  if (!thd->transaction.stmt.bdb_tid)
  {
unknown's avatar
unknown committed
1896
    DBUG_PRINT("trans",("starting transaction stmt"));
unknown's avatar
unknown committed
1897 1898 1899 1900
    error=txn_begin(db_env, (DB_TXN*) thd->transaction.all.bdb_tid,
		    (DB_TXN**) &thd->transaction.stmt.bdb_tid,
		    0);
  }
1901
  transaction= (DB_TXN*) thd->transaction.stmt.bdb_tid;
unknown's avatar
unknown committed
1902 1903 1904 1905
  DBUG_RETURN(error);
}


1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933
/*
  The idea with handler::store_lock() is the following:

  The statement decided which locks we should need for the table
  for updates/deletes/inserts we get WRITE locks, for SELECT... we get
  read locks.

  Before adding the lock into the table lock handler (see thr_lock.c)
  mysqld calls store lock with the requested locks.  Store lock can now
  modify a write lock to a read lock (or some other lock), ignore the
  lock (if we don't want to use MySQL table locks at all) or add locks
  for many tables (like we do when we are using a MERGE handler).

  Berkeley DB changes all WRITE locks to TL_WRITE_ALLOW_WRITE (which
  signals that we are doing WRITES, but we are still allowing other
  reader's and writer's.

  When releasing locks, store_lock() are also called. In this case one
  usually doesn't have to do anything.

  In some exceptional cases MySQL may send a request for a TL_IGNORE;
  This means that we are requesting the same lock as last time and this
  should also be ignored. (This may happen when someone does a flush
  table when we have opened a part of the tables, in which case mysqld
  closes and reopens the tables and tries to get the same locks at last
  time).  In the future we will probably try to remove this.
*/

unknown's avatar
unknown committed
1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945

THR_LOCK_DATA **ha_berkeley::store_lock(THD *thd, THR_LOCK_DATA **to,
					enum thr_lock_type lock_type)
{
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
  {
    /* If we are not doing a LOCK TABLE, then allow multiple writers */
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
	 lock_type <= TL_WRITE) &&
	!thd->in_lock_tables)
      lock_type = TL_WRITE_ALLOW_WRITE;
    lock.type=lock_type;
1946 1947
    lock_on_read= ((table->reginfo.lock_type > TL_WRITE_ALLOW_READ) ? DB_RMW :
		   0);
unknown's avatar
unknown committed
1948 1949 1950 1951 1952 1953 1954 1955 1956
  }
  *to++= &lock;
  return to;
}


static int create_sub_table(const char *table_name, const char *sub_name,
			    DBTYPE type, int flags)
{
1957
  int error;
unknown's avatar
unknown committed
1958 1959
  DB *file;
  DBUG_ENTER("create_sub_table");
unknown's avatar
unknown committed
1960
  DBUG_PRINT("enter",("sub_name: %s  flags: %d",sub_name, flags));
unknown's avatar
unknown committed
1961 1962 1963 1964

  if (!(error=db_create(&file, db_env, 0)))
  {
    file->set_flags(file, flags);
unknown's avatar
unknown committed
1965
    error=(file->open(file, NULL, table_name, sub_name, type,
unknown's avatar
unknown committed
1966 1967 1968
		      DB_THREAD | DB_CREATE, my_umask));
    if (error)
    {
1969 1970 1971
      DBUG_PRINT("error",("Got error: %d when opening table '%s'",error, /* purecov: inspected */
			  table_name)); /* purecov: inspected */
      (void) file->remove(file,table_name,NULL,0); /* purecov: inspected */
unknown's avatar
unknown committed
1972 1973 1974 1975 1976 1977
    }
    else
      (void) file->close(file,0);
  }
  else
  {
1978
    DBUG_PRINT("error",("Got error: %d when creting table",error)); /* purecov: inspected */
unknown's avatar
unknown committed
1979 1980
  }
  if (error)
1981
    my_errno=error; /* purecov: inspected */
unknown's avatar
unknown committed
1982 1983 1984 1985 1986 1987 1988 1989 1990
  DBUG_RETURN(error);
}


int ha_berkeley::create(const char *name, register TABLE *form,
			HA_CREATE_INFO *create_info)
{
  char name_buff[FN_REFLEN];
  char part[7];
1991
  uint index=1;
unknown's avatar
unknown committed
1992
  int error;
unknown's avatar
unknown committed
1993 1994 1995 1996 1997
  DBUG_ENTER("ha_berkeley::create");

  fn_format(name_buff,name,"", ha_berkeley_ext,2 | 4);

  /* Create the main table that will hold the real rows */
unknown's avatar
unknown committed
1998 1999
  if ((error= create_sub_table(name_buff,"main",DB_BTREE,0)))
    DBUG_RETURN(error); /* purecov: inspected */
unknown's avatar
unknown committed
2000

2001
  primary_key=table->primary_key;
unknown's avatar
unknown committed
2002
  /* Create the keys */
2003
  for (uint i=0; i < form->keys; i++)
unknown's avatar
unknown committed
2004
  {
2005 2006 2007
    if (i != primary_key)
    {
      sprintf(part,"key%02d",index++);
unknown's avatar
unknown committed
2008 2009 2010 2011
      if ((error= create_sub_table(name_buff, part, DB_BTREE,
				   (table->key_info[i].flags & HA_NOSAME) ? 0 :
				   DB_DUP)))
	DBUG_RETURN(error); /* purecov: inspected */
2012
    }
unknown's avatar
unknown committed
2013 2014 2015 2016
  }

  /* Create the status block to save information from last status command */
  /* Is DB_BTREE the best option here ? (QUEUE can't be used in sub tables) */
unknown's avatar
unknown committed
2017 2018

  DB *status_block;
unknown's avatar
unknown committed
2019
  if (!(error=(db_create(&status_block, db_env, 0))))
unknown's avatar
unknown committed
2020
  {
unknown's avatar
unknown committed
2021
    if (!(error=(status_block->open(status_block, NULL, name_buff,
unknown's avatar
unknown committed
2022
				    "status", DB_BTREE, DB_CREATE, 0))))
unknown's avatar
unknown committed
2023 2024 2025 2026
    {
      char rec_buff[4+MAX_KEY*4];
      uint length= 4+ table->keys*4;
      bzero(rec_buff, length);
unknown's avatar
unknown committed
2027
      error= write_status(status_block, rec_buff, length);
unknown's avatar
unknown committed
2028 2029 2030 2031
      status_block->close(status_block,0);
    }
  }
  DBUG_RETURN(error);
unknown's avatar
unknown committed
2032 2033 2034
}


unknown's avatar
unknown committed
2035

unknown's avatar
unknown committed
2036 2037 2038 2039
int ha_berkeley::delete_table(const char *name)
{
  int error;
  char name_buff[FN_REFLEN];
unknown's avatar
unknown committed
2040
  DBUG_ENTER("delete_table");
unknown's avatar
unknown committed
2041
  if ((error=db_create(&file, db_env, 0)))
2042
    my_errno=error; /* purecov: inspected */
unknown's avatar
unknown committed
2043 2044 2045
  else
    error=file->remove(file,fn_format(name_buff,name,"",ha_berkeley_ext,2 | 4),
		       NULL,0);
unknown's avatar
unknown committed
2046
  file=0;					// Safety
unknown's avatar
unknown committed
2047
  DBUG_RETURN(error);
unknown's avatar
unknown committed
2048 2049
}

2050 2051 2052 2053 2054 2055 2056 2057 2058 2059

int ha_berkeley::rename_table(const char * from, const char * to)
{
  int error;
  char from_buff[FN_REFLEN];
  char to_buff[FN_REFLEN];

  if ((error= db_create(&file, db_env, 0)))
    my_errno= error;
  else
2060 2061
  {
    /* On should not do a file->close() after rename returns */
2062 2063 2064 2065
    error= file->rename(file, 
			fn_format(from_buff, from, "", ha_berkeley_ext, 2 | 4),
			NULL, fn_format(to_buff, to, "", ha_berkeley_ext,
					2 | 4), 0);
2066
  }
2067 2068 2069 2070
  return error;
}


unknown's avatar
unknown committed
2071 2072 2073 2074 2075 2076 2077 2078
/*
  How many seeks it will take to read through the table
  This is to be comparable to the number returned by records_in_range so
  that we can decide if we should scan the table or use keys.
*/

double ha_berkeley::scan_time()
{
2079
  return rows2double(records/3);
2080
}
unknown's avatar
unknown committed
2081

unknown's avatar
unknown committed
2082 2083
ha_rows ha_berkeley::records_in_range(uint keynr, key_range *start_key,
                                      key_range *end_key)
unknown's avatar
unknown committed
2084 2085 2086
{
  DBT key;
  DB_KEY_RANGE start_range, end_range;
2087
  DB *kfile=key_file[keynr];
unknown's avatar
unknown committed
2088
  double start_pos,end_pos,rows;
2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117
  bool error;
  KEY *key_info= &table->key_info[keynr];
  DBUG_ENTER("ha_berkeley::records_in_range");

  /* Ensure we get maximum range, even for varchar keys with different space */
  key_info->handler.bdb_return_if_eq= -1;
  error= ((start_key && kfile->key_range(kfile,transaction,
                                         pack_key(&key, keynr, key_buff,
                                                  start_key->key,
                                                  start_key->length),
                                         &start_range,0)));
  if (error)
  {
    key_info->handler.bdb_return_if_eq= 0;
    // Better than returning an error
    DBUG_RETURN(HA_BERKELEY_RANGE_COUNT);       /* purecov: inspected */
  }
  key_info->handler.bdb_return_if_eq= 1;
  error= (end_key && kfile->key_range(kfile,transaction,
                                      pack_key(&key, keynr, key_buff,
                                               end_key->key,
                                               end_key->length),
                                      &end_range,0));
  key_info->handler.bdb_return_if_eq= 0;
  if (error)
  {
    // Better than returning an error
    DBUG_RETURN(HA_BERKELEY_RANGE_COUNT);       /* purecov: inspected */
  }
unknown's avatar
unknown committed
2118 2119

  if (!start_key)
unknown's avatar
unknown committed
2120 2121
    start_pos= 0.0;
  else if (start_key->flag == HA_READ_KEY_EXACT)
unknown's avatar
unknown committed
2122 2123 2124 2125 2126
    start_pos=start_range.less;
  else
    start_pos=start_range.less+start_range.equal;

  if (!end_key)
unknown's avatar
unknown committed
2127 2128
    end_pos= 1.0;
  else if (end_key->flag == HA_READ_BEFORE_KEY)
unknown's avatar
unknown committed
2129 2130 2131 2132 2133 2134 2135 2136
    end_pos=end_range.less;
  else
    end_pos=end_range.less+end_range.equal;
  rows=(end_pos-start_pos)*records;
  DBUG_PRINT("exit",("rows: %g",rows));
  DBUG_RETURN(rows <= 1.0 ? (ha_rows) 1 : (ha_rows) rows);
}

2137

2138
ulonglong ha_berkeley::get_auto_increment()
2139
{
2140
  ulonglong nr=1;				// Default if error or new key
2141 2142
  int error;
  (void) ha_berkeley::extra(HA_EXTRA_KEYREAD);
2143 2144

  /* Set 'active_index' */
2145 2146 2147 2148 2149 2150 2151 2152
  ha_berkeley::index_init(table->next_number_index);

  if (!table->next_number_key_offset)
  {						// Autoincrement at key-start
    error=ha_berkeley::index_last(table->record[1]);
  }
  else
  {
2153
    DBT row,old_key;
2154 2155 2156 2157 2158 2159 2160 2161
    bzero((char*) &row,sizeof(row));
    KEY *key_info= &table->key_info[active_index];

    /* Reading next available number for a sub key */
    ha_berkeley::create_key(&last_key, active_index,
			    key_buff, table->record[0],
			    table->next_number_key_offset);
    /* Store for compare */
2162
    memcpy(old_key.data=key_buff2, key_buff, (old_key.size=last_key.size));
2163
    old_key.app_private=(void*) key_info;
2164
    error=1;
2165
    {
2166 2167 2168
      /* Modify the compare so that we will find the next key */
      key_info->handler.bdb_return_if_eq= 1;
      /* We lock the next key as the new key will probl. be on the same page */
unknown's avatar
unknown committed
2169
      error=cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE | DB_RMW);
2170 2171 2172 2173 2174 2175 2176 2177
      key_info->handler.bdb_return_if_eq= 0;
      if (!error || error == DB_NOTFOUND)
      {
	/*
	  Now search go one step back and then we should have found the
	  biggest key with the given prefix
	  */
	error=1;
unknown's avatar
unknown committed
2178 2179 2180
	if (!cursor->c_get(cursor, &last_key, &row, DB_PREV | DB_RMW) &&
	    !berkeley_cmp_packed_key(key_file[active_index], &old_key,
				     &last_key))
2181 2182
	{
	  error=0;				// Found value
2183
	  unpack_key((char*) table->record[1], &last_key, active_index);
2184 2185
	}
      }
2186 2187
    }
  }
2188
  if (!error)
2189
    nr=(ulonglong)
2190
      table->next_number_field->val_int_offset(table->rec_buff_length)+1;
2191 2192 2193 2194 2195
  ha_berkeley::index_end();
  (void) ha_berkeley::extra(HA_EXTRA_NO_KEYREAD);
  return nr;
}

unknown's avatar
unknown committed
2196 2197 2198 2199 2200 2201
void ha_berkeley::print_error(int error, myf errflag)
{
  if (error == DB_LOCK_DEADLOCK)
    error=HA_ERR_LOCK_DEADLOCK;
  handler::print_error(error,errflag);
}
2202 2203 2204 2205 2206

/****************************************************************************
	 Analyzing, checking, and optimizing tables
****************************************************************************/

2207
#ifdef NOT_YET
2208 2209 2210
static void print_msg(THD *thd, const char *table_name, const char *op_name,
		      const char *msg_type, const char *fmt, ...)
{
2211
  Protocol *protocol= thd->protocol;
2212 2213 2214 2215 2216 2217 2218 2219 2220
  char msgbuf[256];
  msgbuf[0] = 0;
  va_list args;
  va_start(args,fmt);

  my_vsnprintf(msgbuf, sizeof(msgbuf), fmt, args);
  msgbuf[sizeof(msgbuf) - 1] = 0; // healthy paranoia
  DBUG_PRINT(msg_type,("message: %s",msgbuf));

unknown's avatar
unknown committed
2221
  protocol->set_nfields(4);
2222 2223 2224 2225 2226 2227
  protocol->prepare_for_resend();
  protocol->store(table_name);
  protocol->store(op_name);
  protocol->store(msg_type);
  protocol->store(msgbuf);
  if (protocol->write())
unknown's avatar
SCRUM  
unknown committed
2228
    thd->killed=THD::KILL_CONNECTION;
2229
}
2230
#endif
2231 2232 2233 2234

int ha_berkeley::analyze(THD* thd, HA_CHECK_OPT* check_opt)
{
  uint i;
2235
  DB_BTREE_STAT *stat=0;
unknown's avatar
unknown committed
2236 2237
  DB_TXN_STAT *txn_stat_ptr= 0;

2238 2239 2240 2241 2242 2243 2244 2245 2246
  /*
   Original bdb documentation says:
   "The DB->stat method cannot be transaction-protected.
   For this reason, it should be called in a thread of
   control that has no open cursors or active transactions."
   So, let's check if there are any changes have been done since
   the beginning of the transaction..
  */

unknown's avatar
unknown committed
2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273
  if (!db_env->txn_stat(db_env, &txn_stat_ptr, 0) &&
      txn_stat_ptr && txn_stat_ptr->st_nactive>=2)
  {
    DB_TXN_ACTIVE *atxn_stmt= 0, *atxn_all= 0;
    
    DB_TXN *txn_all= (DB_TXN*) thd->transaction.all.bdb_tid;
    u_int32_t all_id= txn_all->id(txn_all);
    
    DB_TXN *txn_stmt= (DB_TXN*) thd->transaction.stmt.bdb_tid;
    u_int32_t stmt_id= txn_stmt->id(txn_stmt);
    
    DB_TXN_ACTIVE *cur= txn_stat_ptr->st_txnarray;
    DB_TXN_ACTIVE *end= cur + txn_stat_ptr->st_nactive;
    for (; cur!=end && (!atxn_stmt || !atxn_all); cur++)
    {
      if (cur->txnid==all_id) atxn_all= cur;
      if (cur->txnid==stmt_id) atxn_stmt= cur;
    }
    
    if (atxn_stmt && atxn_all &&
	log_compare(&atxn_stmt->lsn,&atxn_all->lsn))
    {
      free(txn_stat_ptr);
      return HA_ADMIN_REJECT;
    }
    free(txn_stat_ptr);
  }
2274 2275 2276

  for (i=0 ; i < table->keys ; i++)
  {
unknown's avatar
unknown committed
2277 2278 2279 2280 2281
    if (stat)
    {
      free(stat);
      stat=0;
    }
unknown's avatar
unknown committed
2282
    if ((key_file[i]->stat)(key_file[i], (void*) &stat, 0))
2283
      goto err; /* purecov: inspected */
unknown's avatar
unknown committed
2284 2285
    share->rec_per_key[i]= (stat->bt_ndata /
			    (stat->bt_nkeys ? stat->bt_nkeys : 1));
2286
  }
unknown's avatar
unknown committed
2287
  /* A hidden primary key is not in key_file[] */
2288
  if (hidden_primary_key)
unknown's avatar
unknown committed
2289 2290 2291 2292 2293 2294
  {
    if (stat)
    {
      free(stat);
      stat=0;
    }
unknown's avatar
unknown committed
2295
    if ((file->stat)(file, (void*) &stat, 0))
2296
      goto err; /* purecov: inspected */
unknown's avatar
unknown committed
2297
  }
2298
  pthread_mutex_lock(&share->mutex);
unknown's avatar
unknown committed
2299
  share->rows=stat->bt_ndata;
2300 2301 2302
  share->status|=STATUS_BDB_ANALYZE;		// Save status on close
  share->version++;				// Update stat in table
  pthread_mutex_unlock(&share->mutex);
unknown's avatar
unknown committed
2303 2304 2305
  update_status(share,table);			// Write status to file
  if (stat)
    free(stat);
2306 2307
  return ((share->status & STATUS_BDB_ANALYZE) ? HA_ADMIN_FAILED :
	  HA_ADMIN_OK);
unknown's avatar
unknown committed
2308 2309

err:
2310 2311 2312
  if (stat) /* purecov: inspected */
    free(stat); /* purecov: inspected */
  return HA_ADMIN_FAILED; /* purecov: inspected */
2313 2314 2315 2316 2317 2318 2319 2320 2321 2322
}

int ha_berkeley::optimize(THD* thd, HA_CHECK_OPT* check_opt)
{
  return ha_berkeley::analyze(thd,check_opt);
}


int ha_berkeley::check(THD* thd, HA_CHECK_OPT* check_opt)
{
unknown's avatar
unknown committed
2323 2324 2325 2326 2327
  DBUG_ENTER("ha_berkeley::check");

  DBUG_RETURN(HA_ADMIN_NOT_IMPLEMENTED);

#ifdef NOT_YET
2328 2329 2330
  char name_buff[FN_REFLEN];
  int error;
  DB *tmp_file;
unknown's avatar
unknown committed
2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348
  /*
    To get this to work we need to ensure that no running transaction is
    using the table. We also need to create a new environment without
    locking for this.
  */

  /* We must open the file again to be able to check it! */
  if ((error=db_create(&tmp_file, db_env, 0)))
  {
    print_msg(thd, table->real_name, "check", "error",
	      "Got error %d creating environment",error);
    DBUG_RETURN(HA_ADMIN_FAILED);
  }

  /* Compare the overall structure */
  tmp_file->set_bt_compare(tmp_file,
			   (hidden_primary_key ? berkeley_cmp_hidden_key :
			    berkeley_cmp_packed_key));
2349
  tmp_file->app_private= (void*) (table->key_info+table->primary_key);
2350
  fn_format(name_buff,share->table_name,"", ha_berkeley_ext, 2 | 4);
unknown's avatar
unknown committed
2351 2352
  if ((error=tmp_file->verify(tmp_file, name_buff, NullS, (FILE*) 0,
			      hidden_primary_key ? 0 : DB_NOORDERCHK)))
2353 2354 2355
  {
    print_msg(thd, table->real_name, "check", "error",
	      "Got error %d checking file structure",error);
unknown's avatar
unknown committed
2356 2357
    tmp_file->close(tmp_file,0);
    DBUG_RETURN(HA_ADMIN_CORRUPT);
2358
  }
unknown's avatar
unknown committed
2359 2360 2361 2362

  /* Check each index */
  tmp_file->set_bt_compare(tmp_file, berkeley_cmp_packed_key);
  for (uint index=0,i=0 ; i < table->keys ; i++)
2363
  {
unknown's avatar
unknown committed
2364 2365 2366 2367 2368 2369 2370 2371
    char part[7];
    if (i == primary_key)
      strmov(part,"main");
    else
      sprintf(part,"key%02d",++index);
    tmp_file->app_private= (void*) (table->key_info+i);
    if ((error=tmp_file->verify(tmp_file, name_buff, part, (FILE*) 0,
				DB_ORDERCHKONLY)))
2372 2373
    {
      print_msg(thd, table->real_name, "check", "error",
unknown's avatar
unknown committed
2374 2375 2376 2377 2378
		"Key %d was not in order (Error: %d)",
		index+ test(i >= primary_key),
		error);
      tmp_file->close(tmp_file,0);
      DBUG_RETURN(HA_ADMIN_CORRUPT);
2379 2380
    }
  }
unknown's avatar
unknown committed
2381 2382 2383
  tmp_file->close(tmp_file,0);
  DBUG_RETURN(HA_ADMIN_OK);
#endif
2384 2385
}

unknown's avatar
unknown committed
2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397
/****************************************************************************
 Handling the shared BDB_SHARE structure that is needed to provide table
 locking.
****************************************************************************/

static byte* bdb_get_key(BDB_SHARE *share,uint *length,
			 my_bool not_used __attribute__((unused)))
{
  *length=share->table_name_length;
  return (byte*) share->table_name;
}

2398
static BDB_SHARE *get_share(const char *table_name, TABLE *table)
unknown's avatar
unknown committed
2399 2400 2401
{
  BDB_SHARE *share;
  pthread_mutex_lock(&bdb_mutex);
unknown's avatar
unknown committed
2402
  uint length=(uint) strlen(table_name);
2403 2404
  if (!(share=(BDB_SHARE*) hash_search(&bdb_open_tables, (byte*) table_name,
				       length)))
unknown's avatar
unknown committed
2405
  {
2406
    ulong *rec_per_key;
2407 2408 2409
    char *tmp_name;
    DB **key_file;
    u_int32_t *key_type;
unknown's avatar
unknown committed
2410

2411 2412 2413 2414 2415 2416 2417 2418
    if ((share=(BDB_SHARE *)
	 my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
			 &share, sizeof(*share),
			 &rec_per_key, table->keys * sizeof(ha_rows),
			 &tmp_name, length+1,
			 &key_file, (table->keys+1) * sizeof(*key_file),
			 &key_type, (table->keys+1) * sizeof(u_int32_t),
			 NullS)))
unknown's avatar
unknown committed
2419
    {
2420 2421
      share->rec_per_key = rec_per_key;
      share->table_name = tmp_name;
unknown's avatar
unknown committed
2422 2423
      share->table_name_length=length;
      strmov(share->table_name,table_name);
2424 2425
      share->key_file = key_file;
      share->key_type = key_type;
unknown's avatar
SCRUM  
unknown committed
2426
      if (my_hash_insert(&bdb_open_tables, (byte*) share))
unknown's avatar
unknown committed
2427
      {
2428 2429 2430
	pthread_mutex_unlock(&bdb_mutex); /* purecov: inspected */
	my_free((gptr) share,0); /* purecov: inspected */
	return 0; /* purecov: inspected */
unknown's avatar
unknown committed
2431 2432
      }
      thr_lock_init(&share->lock);
2433
      pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
unknown's avatar
unknown committed
2434 2435 2436 2437 2438 2439
    }
  }
  pthread_mutex_unlock(&bdb_mutex);
  return share;
}

unknown's avatar
unknown committed
2440 2441
static int free_share(BDB_SHARE *share, TABLE *table, uint hidden_primary_key,
		      bool mutex_is_locked)
unknown's avatar
unknown committed
2442
{
2443
  int error, result = 0;
2444
  uint keys=table->keys + test(hidden_primary_key);
unknown's avatar
unknown committed
2445
  pthread_mutex_lock(&bdb_mutex);
unknown's avatar
unknown committed
2446
  if (mutex_is_locked)
2447
    pthread_mutex_unlock(&share->mutex); /* purecov: inspected */
unknown's avatar
unknown committed
2448 2449
  if (!--share->use_count)
  {
2450
    DB **key_file = share->key_file;
2451
    update_status(share,table);
2452
    /* this does share->file->close() implicitly */
2453
    for (uint i=0; i < keys; i++)
2454 2455
    {
      if (key_file[i] && (error=key_file[i]->close(key_file[i],0)))
2456
	result=error; /* purecov: inspected */
2457 2458 2459
    }
    if (share->status_block &&
	(error = share->status_block->close(share->status_block,0)))
2460
      result = error; /* purecov: inspected */
2461
    hash_delete(&bdb_open_tables, (byte*) share);
unknown's avatar
unknown committed
2462
    thr_lock_delete(&share->lock);
2463
    pthread_mutex_destroy(&share->mutex);
unknown's avatar
unknown committed
2464 2465 2466
    my_free((gptr) share, MYF(0));
  }
  pthread_mutex_unlock(&bdb_mutex);
2467
  return result;
unknown's avatar
unknown committed
2468 2469
}

2470 2471 2472 2473
/*
  Get status information that is stored in the 'status' sub database
  and the max used value for the hidden primary key.
*/
2474

2475
void ha_berkeley::get_status()
2476
{
2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497
  if (!test_all_bits(share->status,(STATUS_PRIMARY_KEY_INIT |
				    STATUS_ROW_COUNT_INIT)))
  {
    pthread_mutex_lock(&share->mutex);
    if (!(share->status & STATUS_PRIMARY_KEY_INIT))
    {
      (void) extra(HA_EXTRA_KEYREAD);
      index_init(primary_key);
      if (!index_last(table->record[1]))
	share->auto_ident=uint5korr(current_ident);
      index_end();
      (void) extra(HA_EXTRA_NO_KEYREAD);
    }
    if (! share->status_block)
    {
      char name_buff[FN_REFLEN];
      uint open_mode= (((table->db_stat & HA_READ_ONLY) ? DB_RDONLY : 0)
		       | DB_THREAD);
      fn_format(name_buff, share->table_name,"", ha_berkeley_ext, 2 | 4);
      if (!db_create(&share->status_block, db_env, 0))
      {
unknown's avatar
unknown committed
2498
	if (share->status_block->open(share->status_block, NULL, name_buff,
unknown's avatar
unknown committed
2499
				      "status", DB_BTREE, open_mode, 0))
2500
	{
2501 2502
	  share->status_block->close(share->status_block, 0); /* purecov: inspected */
	  share->status_block=0; /* purecov: inspected */
2503 2504 2505 2506 2507 2508 2509
	}
      }
    }
    if (!(share->status & STATUS_ROW_COUNT_INIT) && share->status_block)
    {
      share->org_rows=share->rows=
	table->max_rows ? table->max_rows : HA_BERKELEY_MAX_ROWS;
2510
      if (!share->status_block->cursor(share->status_block, 0, &cursor, 0))
2511 2512
      {
	DBT row;
unknown's avatar
unknown committed
2513
	char rec_buff[64];
2514 2515 2516
	bzero((char*) &row,sizeof(row));
	bzero((char*) &last_key,sizeof(last_key));
	row.data=rec_buff;
unknown's avatar
unknown committed
2517
	row.ulen=sizeof(rec_buff);
2518 2519 2520 2521
	row.flags=DB_DBT_USERMEM;
	if (!cursor->c_get(cursor, &last_key, &row, DB_FIRST))
	{
	  uint i;
unknown's avatar
unknown committed
2522
	  uchar *pos=(uchar*) row.data;
2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538
	  share->org_rows=share->rows=uint4korr(pos); pos+=4;
	  for (i=0 ; i < table->keys ; i++)
	  {
	    share->rec_per_key[i]=uint4korr(pos); pos+=4;
	  }
	}
	cursor->c_close(cursor);
      }
      cursor=0;					// Safety
    }
    share->status|= STATUS_PRIMARY_KEY_INIT | STATUS_ROW_COUNT_INIT;
    pthread_mutex_unlock(&share->mutex);
  }
}


unknown's avatar
unknown committed
2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555
static int write_status(DB *status_block, char *buff, uint length)
{
  DBT row,key;
  int error;
  const char *key_buff="status";

  bzero((char*) &row,sizeof(row));
  bzero((char*) &key,sizeof(key));
  row.data=buff;
  key.data=(void*) key_buff;
  key.size=sizeof(key_buff);
  row.size=length;
  error=status_block->put(status_block, 0, &key, &row, 0);
  return error;
}


2556 2557 2558 2559 2560
static void update_status(BDB_SHARE *share, TABLE *table)
{
  DBUG_ENTER("update_status");
  if (share->rows != share->org_rows ||
      (share->status & STATUS_BDB_ANALYZE))
2561
  {
2562 2563 2564 2565 2566 2567 2568 2569
    pthread_mutex_lock(&share->mutex);
    if (!share->status_block)
    {
      /*
	Create sub database 'status' if it doesn't exist from before
	(This '*should*' always exist for table created with MySQL)
      */

2570 2571 2572 2573
      char name_buff[FN_REFLEN]; /* purecov: inspected */
      if (db_create(&share->status_block, db_env, 0)) /* purecov: inspected */
	goto end; /* purecov: inspected */
      share->status_block->set_flags(share->status_block,0); /* purecov: inspected */
unknown's avatar
unknown committed
2574
      if (share->status_block->open(share->status_block, NULL,
2575 2576 2577
				    fn_format(name_buff,share->table_name,"",
					      ha_berkeley_ext,2 | 4),
				    "status", DB_BTREE,
2578 2579
				    DB_THREAD | DB_CREATE, my_umask)) /* purecov: inspected */
	goto end; /* purecov: inspected */
2580 2581
    }
    {
unknown's avatar
unknown committed
2582
      char rec_buff[4+MAX_KEY*4], *pos=rec_buff;
2583
      int4store(pos,share->rows); pos+=4;
unknown's avatar
unknown committed
2584
      for (uint i=0 ; i < table->keys ; i++)
2585 2586 2587
      {
	int4store(pos,share->rec_per_key[i]); pos+=4;
      }
unknown's avatar
unknown committed
2588 2589 2590
      DBUG_PRINT("info",("updating status for %s",share->table_name));
      (void) write_status(share->status_block, rec_buff,
			  (uint) (pos-rec_buff));
2591
      share->status&= ~STATUS_BDB_ANALYZE;
unknown's avatar
unknown committed
2592
      share->org_rows=share->rows;
2593 2594 2595
    }
end:
    pthread_mutex_unlock(&share->mutex);
2596
  }
2597
  DBUG_VOID_RETURN;
2598 2599
}

2600

2601 2602 2603 2604 2605
/*
  Return an estimated of the number of rows in the table.
  Used when sorting to allocate buffers and by the optimizer.
*/

unknown's avatar
unknown committed
2606
ha_rows ha_berkeley::estimate_rows_upper_bound()
2607
{
2608
  return share->rows + HA_BERKELEY_EXTRA_ROWS;
2609 2610
}

2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625
int ha_berkeley::cmp_ref(const byte *ref1, const byte *ref2)
{
  if (hidden_primary_key)
    return memcmp(ref1, ref2, BDB_HIDDEN_PRIMARY_KEY_LENGTH);

  int result;
  Field *field;
  KEY *key_info=table->key_info+table->primary_key;
  KEY_PART_INFO *key_part=key_info->key_part;
  KEY_PART_INFO *end=key_part+key_info->key_parts;

  for (; key_part != end; key_part++)
  {
    field=  key_part->field; 
    result= field->pack_cmp((const char*)ref1, (const char*)ref2, 
2626
                            key_part->length, 0);
2627 2628
    if (result)
      return result;
2629 2630
    ref1+= field->packed_col_length((const char*)ref1, key_part->length);
    ref2+= field->packed_col_length((const char*)ref2, key_part->length);
2631 2632 2633 2634 2635
  }

  return 0;
}

unknown's avatar
unknown committed
2636
#endif /* HAVE_BERKELEY_DB */