ha_archive.cc 40.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (C) 2003 MySQL AB

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; either version 2 of the License, or
  (at your option) any later version.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with this program; if not, write to the Free Software
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

unknown's avatar
unknown committed
17
#ifdef USE_PRAGMA_IMPLEMENTATION
18 19 20
#pragma implementation        // gcc: Class implementation
#endif

unknown's avatar
unknown committed
21
#include "mysql_priv.h"
22 23

#include "ha_archive.h"
24
#include <my_dir.h>
25 26 27 28 29 30 31

/*
  First, if you want to understand storage engines you should look at 
  ha_example.cc and ha_example.h. 
  This example was written as a test case for a customer who needed
  a storage engine without indexes that could compress data very well.
  So, welcome to a completely compressed storage engine. This storage
32
  engine only does inserts. No replace, deletes, or updates. All reads are 
unknown's avatar
unknown committed
33
  complete table scans. Compression is done through azip (bzip compresses
34
  better, but only marginally, if someone asks I could add support for
unknown's avatar
unknown committed
35
  it too, but beaware that it costs a lot more in CPU time then azip).
36 37 38
  
  We keep a file pointer open for each instance of ha_archive for each read
  but for writes we keep one open file handle just for that. We flush it
unknown's avatar
unknown committed
39
  only if we have a read occur. azip handles compressing lots of records
40 41 42 43 44
  at once much better then doing lots of little records between writes.
  It is possible to not lock on writes but this would then mean we couldn't
  handle bulk inserts as well (that is if someone was trying to read at
  the same time since we would want to flush).

45 46 47 48 49 50 51 52 53 54
  A "meta" file is kept alongside the data file. This file serves two purpose.
  The first purpose is to track the number of rows in the table. The second 
  purpose is to determine if the table was closed properly or not. When the 
  meta file is first opened it is marked as dirty. It is opened when the table 
  itself is opened for writing. When the table is closed the new count for rows 
  is written to the meta file and the file is marked as clean. If the meta file 
  is opened and it is marked as dirty, it is assumed that a crash occured. At 
  this point an error occurs and the user is told to rebuild the file.
  A rebuild scans the rows and rewrites the meta file. If corruption is found
  in the data file then the meta file is not repaired.
55

56
  At some point a recovery method for such a drastic case needs to be divised.
57

58
  Locks are row level, and you will get a consistant read. 
59 60 61 62 63 64 65 66 67 68

  For performance as far as table scans go it is quite fast. I don't have
  good numbers but locally it has out performed both Innodb and MyISAM. For
  Innodb the question will be if the table can be fit into the buffer
  pool. For MyISAM its a question of how much the file system caches the
  MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
  doesn't have enough memory to cache entire table that archive turns out 
  to be any faster. For writes it is always a bit slower then MyISAM. It has no
  internal limits though for row length.

69
  Examples between MyISAM (packed) and Archive.
70 71 72 73 74 75 76 77 78 79 80

  Table with 76695844 identical rows:
  29680807 a_archive.ARZ
  920350317 a.MYD


  Table with 8991478 rows (all of Slashdot's comments):
  1922964506 comment_archive.ARZ
  2944970297 comment_text.MYD


81 82 83 84 85 86
  TODO:
   Add bzip optional support.
   Allow users to set compression level.
   Add truncate table command.
   Implement versioning, should be easy.
   Allow for errors, find a way to mark bad rows.
unknown's avatar
unknown committed
87
   Talk to the azip guys, come up with a writable format so that updates are doable
88
     without switching to a block method.
89
   Add optional feature so that rows can be flushed at interval (which will cause less
90 91 92 93 94
     compression but may speed up ordered searches).
   Checkpoint the meta file to allow for faster rebuilds.
   Dirty open (right now the meta file is repaired if a crash occured).
   Option to allow for dirty reads, this would lower the sync calls, which would make
     inserts a lot faster, but would mean highly arbitrary reads.
95 96 97

    -Brian
*/
98 99 100 101 102 103 104 105 106
/*
  Notes on file formats.
  The Meta file is layed out as:
  check - Just an int of 254 to make sure that the the file we are opening was
          never corrupted.
  version - The current version of the file format.
  rows - This is an unsigned long long which is the number of rows in the data
         file.
  check point - Reserved for future use
107
  auto increment - MAX value for autoincrement
unknown's avatar
unknown committed
108 109
  dirty - Status of the file, whether or not its values are the latest. This
          flag is what causes a repair to occur
110 111 112 113 114 115

  The data file:
  check - Just an int of 254 to make sure that the the file we are opening was
          never corrupted.
  version - The current version of the file format.
  data - The data is stored in a "row +blobs" format.
unknown's avatar
unknown committed
116
*/
117

118
/* If the archive storage engine has been inited */
119
static bool archive_inited= FALSE;
120 121 122 123 124
/* Variables for archive share methods */
pthread_mutex_t archive_mutex;
static HASH archive_open_tables;

/* The file extension */
125 126 127 128
#define ARZ ".ARZ"               // The data file
#define ARN ".ARN"               // Files used during an optimize call
#define ARM ".ARM"               // Meta file
/*
129
  uchar + uchar + ulonglong + ulonglong + ulonglong + uchar
130
*/
131 132 133
#define META_BUFFER_SIZE sizeof(uchar) + sizeof(uchar) + sizeof(ulonglong) \
  + sizeof(ulonglong) + sizeof(ulonglong) + sizeof(uchar)

134 135 136 137 138
/*
  uchar + uchar
*/
#define DATA_BUFFER_SIZE 2       // Size of the data used in the data file
#define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
139

140
/* Static declarations for handerton */
unknown's avatar
unknown committed
141
static handler *archive_create_handler(TABLE_SHARE *table);
142
/*
143 144 145
  Number of rows that will force a bulk insert.
*/
#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
146 147


unknown's avatar
unknown committed
148
/* dummy handlerton - only to have something to return from archive_db_init */
149
handlerton archive_hton = {
unknown's avatar
unknown committed
150
  MYSQL_HANDLERTON_INTERFACE_VERSION,
151
  "ARCHIVE",
152 153 154 155
  SHOW_OPTION_YES,
  "Archive storage engine", 
  DB_TYPE_ARCHIVE_DB,
  archive_db_init,
unknown's avatar
unknown committed
156 157
  0,       /* slot */
  0,       /* savepoint size. */
158 159 160 161 162 163 164 165 166 167 168 169 170
  NULL,    /* close_connection */
  NULL,    /* savepoint */
  NULL,    /* rollback to savepoint */
  NULL,    /* releas savepoint */
  NULL,    /* commit */
  NULL,    /* rollback */
  NULL,    /* prepare */
  NULL,    /* recover */
  NULL,    /* commit_by_xid */
  NULL,    /* rollback_by_xid */
  NULL,    /* create_cursor_read_view */
  NULL,    /* set_cursor_read_view */
  NULL,    /* close_cursor_read_view */
171 172 173 174 175 176
  archive_create_handler,    /* Create a new handler */
  NULL,    /* Drop a database */
  archive_db_end,    /* Panic call */
  NULL,    /* Start Consistent Snapshot */
  NULL,    /* Flush logs */
  NULL,    /* Show status */
unknown's avatar
unknown committed
177 178
  NULL,    /* Partition flags */
  NULL,    /* Alter table flags */
179
  NULL,    /* Alter interface */
180 181 182
  HTON_NO_FLAGS,
  NULL, /* binlog_func */
  NULL /* binlog_log_query */
unknown's avatar
unknown committed
183 184
};

unknown's avatar
unknown committed
185
static handler *archive_create_handler(TABLE_SHARE *table)
186 187 188
{
  return new ha_archive(table);
}
unknown's avatar
unknown committed
189

190 191 192 193 194 195 196 197 198 199
/*
  Used for hash table that tracks open tables.
*/
static byte* archive_get_key(ARCHIVE_SHARE *share,uint *length,
                             my_bool not_used __attribute__((unused)))
{
  *length=share->table_name_length;
  return (byte*) share->table_name;
}

200 201 202 203 204 205 206 207 208

/*
  Initialize the archive handler.

  SYNOPSIS
    archive_db_init()
    void

  RETURN
209 210
    FALSE       OK
    TRUE        Error
211 212
*/

213
bool archive_db_init()
214
{
215 216 217
  DBUG_ENTER("archive_db_init");
  if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
    goto error;
unknown's avatar
unknown committed
218 219
  if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
                (hash_get_key) archive_get_key, 0, 0))
220 221 222 223 224 225 226 227 228 229 230
  {
    VOID(pthread_mutex_destroy(&archive_mutex));
  }
  else
  {
    archive_inited= TRUE;
    DBUG_RETURN(FALSE);
  }
error:
  have_archive_db= SHOW_OPTION_DISABLED;	// If we couldn't use handler
  DBUG_RETURN(TRUE);
231 232 233 234 235 236 237 238 239 240 241 242 243
}

/*
  Release the archive handler.

  SYNOPSIS
    archive_db_end()
    void

  RETURN
    FALSE       OK
*/

244
int archive_db_end(ha_panic_function type)
245
{
246 247 248 249 250 251
  if (archive_inited)
  {
    hash_free(&archive_open_tables);
    VOID(pthread_mutex_destroy(&archive_mutex));
  }
  archive_inited= 0;
252
  return 0;
253 254
}

unknown's avatar
unknown committed
255
ha_archive::ha_archive(TABLE_SHARE *table_arg)
256 257 258 259 260 261
  :handler(&archive_hton, table_arg), delayed_insert(0), bulk_insert(0)
{
  /* Set our original buffer from pre-allocated memory */
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);

  /* The size of the offset value we will use for position() */
262
  ref_length = sizeof(my_off_t);
263
}
264

265 266 267
/*
  This method reads the header of a datafile and returns whether or not it was successful.
*/
unknown's avatar
unknown committed
268
int ha_archive::read_data_header(azio_stream *file_to_read)
269
{
270
  uchar data_buffer[DATA_BUFFER_SIZE];
271 272
  DBUG_ENTER("ha_archive::read_data_header");

unknown's avatar
unknown committed
273
  if (azrewind(file_to_read) == -1)
274 275
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

unknown's avatar
unknown committed
276
  if (azread(file_to_read, data_buffer, DATA_BUFFER_SIZE) != DATA_BUFFER_SIZE)
277
    DBUG_RETURN(errno ? errno : -1);
278 279 280 281 282 283
  
  DBUG_PRINT("ha_archive::read_data_header", ("Check %u", data_buffer[0]));
  DBUG_PRINT("ha_archive::read_data_header", ("Version %u", data_buffer[1]));
  
  if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) &&  
      (data_buffer[1] != (uchar)ARCHIVE_VERSION))
284 285 286 287
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

  DBUG_RETURN(0);
}
288 289

/*
290 291
  This method writes out the header of a datafile and returns whether or not it was successful.
*/
unknown's avatar
unknown committed
292
int ha_archive::write_data_header(azio_stream *file_to_write)
293
{
294
  uchar data_buffer[DATA_BUFFER_SIZE];
295 296
  DBUG_ENTER("ha_archive::write_data_header");

297 298 299
  data_buffer[0]= (uchar)ARCHIVE_CHECK_HEADER;
  data_buffer[1]= (uchar)ARCHIVE_VERSION;

unknown's avatar
unknown committed
300
  if (azwrite(file_to_write, &data_buffer, DATA_BUFFER_SIZE) != 
301
      DATA_BUFFER_SIZE)
302
    goto error;
303 304
  DBUG_PRINT("ha_archive::write_data_header", ("Check %u", (uint)data_buffer[0]));
  DBUG_PRINT("ha_archive::write_data_header", ("Version %u", (uint)data_buffer[1]));
305 306 307 308 309 310 311 312 313 314

  DBUG_RETURN(0);
error:
  DBUG_RETURN(errno);
}

/*
  This method reads the header of a meta file and returns whether or not it was successful.
  *rows will contain the current number of rows in the data file upon success.
*/
315 316
int ha_archive::read_meta_file(File meta_file, ha_rows *rows, 
                               ulonglong *auto_increment)
317
{
318
  uchar meta_buffer[META_BUFFER_SIZE];
319
  uchar *ptr= meta_buffer;
320 321 322 323 324
  ulonglong check_point;

  DBUG_ENTER("ha_archive::read_meta_file");

  VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
325
  if (my_read(meta_file, (byte*)meta_buffer, META_BUFFER_SIZE, 0) != META_BUFFER_SIZE)
326 327 328 329 330
    DBUG_RETURN(-1);
  
  /*
    Parse out the meta data, we ignore version at the moment
  */
331 332 333 334 335 336 337 338

  ptr+= sizeof(uchar)*2; // Move past header
  *rows= (ha_rows)uint8korr(ptr);
  ptr+= sizeof(ulonglong); // Move past rows
  check_point= uint8korr(ptr);
  ptr+= sizeof(ulonglong); // Move past check_point
  *auto_increment= uint8korr(ptr);
  ptr+= sizeof(ulonglong); // Move past auto_increment
339 340 341

  DBUG_PRINT("ha_archive::read_meta_file", ("Check %d", (uint)meta_buffer[0]));
  DBUG_PRINT("ha_archive::read_meta_file", ("Version %d", (uint)meta_buffer[1]));
342 343 344 345
  DBUG_PRINT("ha_archive::read_meta_file", ("Rows %llu", *rows));
  DBUG_PRINT("ha_archive::read_meta_file", ("Checkpoint %llu", check_point));
  DBUG_PRINT("ha_archive::read_meta_file", ("Auto-Increment %llu", *auto_increment));
  DBUG_PRINT("ha_archive::read_meta_file", ("Dirty %d", (int)(*ptr)));
346 347

  if ((meta_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) || 
348
      ((bool)(*ptr)== TRUE))
349
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
350 351 352 353 354 355 356 357 358

  my_sync(meta_file, MYF(MY_WME));

  DBUG_RETURN(0);
}

/*
  This method writes out the header of a meta file and returns whether or not it was successful.
  By setting dirty you say whether or not the file represents the actual state of the data file.
359
  Upon ::open() we set to dirty, and upon ::close() we set to clean.
360
*/
361 362
int ha_archive::write_meta_file(File meta_file, ha_rows rows, 
                                ulonglong auto_increment, bool dirty)
363
{
364
  uchar meta_buffer[META_BUFFER_SIZE];
365
  uchar *ptr= meta_buffer;
366 367
  ulonglong check_point= 0; //Reserved for the future

368 369
  DBUG_ENTER("ha_archive::write_meta_file");

370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
  *ptr= (uchar)ARCHIVE_CHECK_HEADER;
  ptr += sizeof(uchar);
  *ptr= (uchar)ARCHIVE_VERSION;
  ptr += sizeof(uchar);
  int8store(ptr, (ulonglong)rows); 
  ptr += sizeof(ulonglong);
  int8store(ptr, check_point); 
  ptr += sizeof(ulonglong);
  int8store(ptr, auto_increment); 
  ptr += sizeof(ulonglong);
  *ptr= (uchar)dirty;
  DBUG_PRINT("ha_archive::write_meta_file", ("Check %d", 
                                             (uint)ARCHIVE_CHECK_HEADER));
  DBUG_PRINT("ha_archive::write_meta_file", ("Version %d", 
                                             (uint)ARCHIVE_VERSION));
385
  DBUG_PRINT("ha_archive::write_meta_file", ("Rows %llu", (ulonglong)rows));
386
  DBUG_PRINT("ha_archive::write_meta_file", ("Checkpoint %llu", check_point));
387 388
  DBUG_PRINT("ha_archive::write_meta_file", ("Auto Increment %llu",
                                             auto_increment));
389
  DBUG_PRINT("ha_archive::write_meta_file", ("Dirty %d", (uint)dirty));
390 391

  VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
392
  if (my_write(meta_file, (byte *)meta_buffer, META_BUFFER_SIZE, 0) != META_BUFFER_SIZE)
393 394 395 396 397 398 399 400 401 402
    DBUG_RETURN(-1);
  
  my_sync(meta_file, MYF(MY_WME));

  DBUG_RETURN(0);
}


/*
  We create the shared memory space that we will use for the open table. 
403 404 405
  No matter what we try to get or create a share. This is so that a repair
  table operation can occur. 

406
  See ha_example.cc for a longer description.
407
*/
408 409
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, 
                                     TABLE *table, int *rc)
410 411
{
  ARCHIVE_SHARE *share;
412
  char meta_file_name[FN_REFLEN];
413 414
  uint length;
  char *tmp_name;
415
  DBUG_ENTER("ha_archive::get_share");
416 417 418 419 420 421 422 423

  pthread_mutex_lock(&archive_mutex);
  length=(uint) strlen(table_name);

  if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
                                           (byte*) table_name,
                                           length)))
  {
424
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
425 426
                          &share, sizeof(*share),
                          &tmp_name, length+1,
427
                          NullS)) 
428 429
    {
      pthread_mutex_unlock(&archive_mutex);
430 431
      *rc= HA_ERR_OUT_OF_MEM;
      DBUG_RETURN(NULL);
432 433
    }

434 435 436
    share->use_count= 0;
    share->table_name_length= length;
    share->table_name= tmp_name;
437
    share->crashed= FALSE;
438
    fn_format(share->data_file_name,table_name,"",ARZ,MY_REPLACE_EXT|MY_UNPACK_FILENAME);
439
    fn_format(meta_file_name,table_name,"",ARM,MY_REPLACE_EXT|MY_UNPACK_FILENAME);
440
    strmov(share->table_name,table_name);
441 442 443 444 445
    /*
      We will use this lock for rows.
    */
    VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
    if ((share->meta_file= my_open(meta_file_name, O_RDWR, MYF(0))) == -1)
446
      share->crashed= TRUE;
447 448 449
    
    /*
      After we read, we set the file to dirty. When we close, we will do the 
450 451
      opposite. If the meta file will not open we assume it is crashed and
      leave it up to the user to fix.
452
    */
453 454
    if (read_meta_file(share->meta_file, &share->rows_recorded, 
                       &share->auto_increment_value))
455 456
      share->crashed= TRUE;
    else
457 458
      (void)write_meta_file(share->meta_file, share->rows_recorded,
                            share->auto_increment_value, TRUE);
459
    /* 
460 461 462
      It is expensive to open and close the data files and since you can't have
      a gzip file that can be both read and written we keep a writer open
      that is shared amoung all open tables.
463
    */
464 465
    if (!(azopen(&(share->archive_write), share->data_file_name, 
                 O_WRONLY|O_APPEND|O_BINARY)))
unknown's avatar
unknown committed
466 467
    {
      DBUG_PRINT("info", ("Could not open archive write file"));
468
      share->crashed= TRUE;
unknown's avatar
unknown committed
469
    }
470
    VOID(my_hash_insert(&archive_open_tables, (byte*) share));
471
    thr_lock_init(&share->lock);
472 473
  }
  share->use_count++;
474 475 476 477 478
  DBUG_PRINT("info", ("archive table %.*s has %d open handles now", 
                      share->table_name_length, share->table_name,
                      share->use_count));
  if (share->crashed)
    *rc= HA_ERR_CRASHED_ON_USAGE;
479 480
  pthread_mutex_unlock(&archive_mutex);

481
  DBUG_RETURN(share);
482 483 484 485
}


/* 
486
  Free the share.
487 488
  See ha_example.cc for a description.
*/
489
int ha_archive::free_share(ARCHIVE_SHARE *share)
490 491
{
  int rc= 0;
492 493 494 495 496
  DBUG_ENTER("ha_archive::free_share");
  DBUG_PRINT("info", ("archive table %.*s has %d open handles on entrance", 
                      share->table_name_length, share->table_name,
                      share->use_count));

497 498 499 500 501
  pthread_mutex_lock(&archive_mutex);
  if (!--share->use_count)
  {
    hash_delete(&archive_open_tables, (byte*) share);
    thr_lock_delete(&share->lock);
502
    VOID(pthread_mutex_destroy(&share->mutex));
503
    if (share->crashed)
504 505
      (void)write_meta_file(share->meta_file, share->rows_recorded,
                            share->auto_increment_value, TRUE);
506
    else
507 508
      (void)write_meta_file(share->meta_file, share->rows_recorded,
                            share->auto_increment_value, FALSE);
unknown's avatar
unknown committed
509
    if (azclose(&(share->archive_write)))
510
      rc= 1;
511 512
    if (my_close(share->meta_file, MYF(0)))
      rc= 1;
unknown's avatar
unknown committed
513
    my_free((gptr) share, MYF(0));
514 515 516
  }
  pthread_mutex_unlock(&archive_mutex);

517
  DBUG_RETURN(rc);
518 519 520
}


unknown's avatar
unknown committed
521
/*
522 523
  We just implement one additional file extension.
*/
unknown's avatar
unknown committed
524 525 526 527 528 529
static const char *ha_archive_exts[] = {
  ARZ,
  ARM,
  NullS
};

530
const char **ha_archive::bas_ext() const
unknown's avatar
unknown committed
531 532 533
{
  return ha_archive_exts;
}
534 535 536 537 538 539 540 541


/* 
  When opening a file we:
  Create/get our shared structure.
  Init out lock.
  We open the file we will read from.
*/
542
int ha_archive::open(const char *name, int mode, uint open_options)
543
{
544
  int rc= 0;
545 546
  DBUG_ENTER("ha_archive::open");

547 548 549 550 551 552 553 554 555 556 557 558 559 560
  DBUG_PRINT("info", ("archive table was opened for crash %s", 
                      (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
  share= get_share(name, table, &rc);

  if (rc == HA_ERR_CRASHED_ON_USAGE && !(open_options & HA_OPEN_FOR_REPAIR))
  {
    free_share(share);
    DBUG_RETURN(rc);
  }
  else if (rc == HA_ERR_OUT_OF_MEM)
  {
    DBUG_RETURN(rc);
  }

561 562
  thr_lock_data_init(&share->lock,&lock,NULL);

unknown's avatar
unknown committed
563
  if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
564
  {
565 566 567
    if (errno == EROFS || errno == EACCES)
      DBUG_RETURN(my_errno= errno);
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
568
  }
569

570 571 572 573
  DBUG_PRINT("info", ("archive table was crashed %s", 
                      rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
  if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
  {
574
    DBUG_RETURN(0);
575 576 577
  }
  else
    DBUG_RETURN(rc);
578 579 580 581
}


/*
582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597
  Closes the file.

  SYNOPSIS
    close();
  
  IMPLEMENTATION:

  We first close this storage engines file handle to the archive and
  then remove our reference count to the table (and possibly free it
  as well).

  RETURN
    0  ok
    1  Error
*/

598 599
int ha_archive::close(void)
{
600
  int rc= 0;
601
  DBUG_ENTER("ha_archive::close");
602 603

  /* First close stream */
unknown's avatar
unknown committed
604
  if (azclose(&archive))
605 606 607 608 609
    rc= 1;
  /* then also close share */
  rc|= free_share(share);

  DBUG_RETURN(rc);
610 611 612 613
}


/*
614 615 616 617 618 619
  We create our data file here. The format is pretty simple. 
  You can read about the format of the data file above.
  Unlike other storage engines we do not "pack" our data. Since we 
  are about to do a general compression, packing would just be a waste of 
  CPU time. If the table has blobs they are written after the row in the order 
  of creation.
620 621 622 623
*/

int ha_archive::create(const char *name, TABLE *table_arg,
                       HA_CREATE_INFO *create_info)
624
{
625
  File create_file;  // We use to create the datafile and the metafile
626
  char name_buff[FN_REFLEN];
627
  int error;
628 629
  DBUG_ENTER("ha_archive::create");

630 631 632 633
  auto_increment_value= (create_info->auto_increment_value ?
                   create_info->auto_increment_value -1 :
                   (ulonglong) 0);

634 635 636 637 638 639 640
  if ((create_file= my_create(fn_format(name_buff,name,"",ARM,
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
  {
    error= my_errno;
    goto error;
  }
641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659

  for (uint key= 0; key < table_arg->s->keys; key++)
  {
    KEY *pos= table_arg->key_info+key;
    KEY_PART_INFO *key_part=     pos->key_part;
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;

    for (; key_part != key_part_end; key_part++)
    {
      Field *field= key_part->field;

      if (!(field->flags & AUTO_INCREMENT_FLAG))
      {
        error= -1;
        goto error;
      }
    }
  }

660
  write_meta_file(create_file, 0, auto_increment_value, FALSE);
661 662 663 664 665
  my_close(create_file,MYF(0));

  /* 
    We reuse name_buff since it is available.
  */
666 667 668 669 670
  if ((create_file= my_create(fn_format(name_buff,name,"",ARZ,
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
  {
    error= my_errno;
671
    goto error;
672
  }
unknown's avatar
unknown committed
673
  if (!azdopen(&archive, create_file, O_WRONLY|O_BINARY))
674
  {
675
    error= errno;
676
    goto error2;
677
  }
unknown's avatar
unknown committed
678
  if (write_data_header(&archive))
679
  {
680 681
    error= errno;
    goto error3;
682
  }
683

unknown's avatar
unknown committed
684
  if (azclose(&archive))
unknown's avatar
unknown committed
685
  {
686
    error= errno;
687
    goto error2;
688 689
  }

690
  DBUG_RETURN(0);
691

692
error3:
unknown's avatar
unknown committed
693 694
  /* We already have an error, so ignore results of azclose. */
  (void)azclose(&archive);
695
error2:
696 697
  my_close(create_file, MYF(0));
  delete_table(name);
698
error:
699 700
  /* Return error number, if we got one */
  DBUG_RETURN(error ? error : -1);
701 702
}

703 704
/*
  This is where the actual row is written out.
705
*/
unknown's avatar
unknown committed
706
int ha_archive::real_write_row(byte *buf, azio_stream *writer)
707
{
708
  my_off_t written;
709
  uint *ptr, *end;
710
  DBUG_ENTER("ha_archive::real_write_row");
711

unknown's avatar
unknown committed
712
  written= azwrite(writer, buf, table->s->reclength);
713 714
  DBUG_PRINT("ha_archive::real_write_row", ("Wrote %d bytes expected %d", 
                                            written, table->s->reclength));
715
  if (!delayed_insert || !bulk_insert)
716 717
    share->dirty= TRUE;

718
  if (written != (my_off_t)table->s->reclength)
719
    DBUG_RETURN(errno ? errno : -1);
720 721 722 723
  /*
    We should probably mark the table as damagaged if the record is written
    but the blob fails.
  */
unknown's avatar
unknown committed
724
  for (ptr= table->s->blob_field, end= ptr + table->s->blob_fields ;
725 726
       ptr != end ;
       ptr++)
727
  {
728
    char *data_ptr;
729
    uint32 size= ((Field_blob*) table->field[*ptr])->get_length();
730

731 732
    if (size)
    {
733
      ((Field_blob*) table->field[*ptr])->get_ptr(&data_ptr);
unknown's avatar
unknown committed
734
      written= azwrite(writer, data_ptr, (unsigned)size);
735
      if (written != (my_off_t)size)
736
        DBUG_RETURN(errno ? errno : -1);
737
    }
738
  }
739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754
  DBUG_RETURN(0);
}


/* 
  Look at ha_archive::open() for an explanation of the row format.
  Here we just write out the row.

  Wondering about start_bulk_insert()? We don't implement it for
  archive since it optimizes for lots of writes. The only save
  for implementing start_bulk_insert() is that we could skip 
  setting dirty to true each time.
*/
int ha_archive::write_row(byte *buf)
{
  int rc;
755 756
  byte *read_buf= NULL;
  ulonglong temp_auto;
757 758 759 760 761 762 763 764 765
  DBUG_ENTER("ha_archive::write_row");

  if (share->crashed)
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

  statistic_increment(table->in_use->status_var.ha_write_count, &LOCK_status);
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
    table->timestamp_field->set_time();
  pthread_mutex_lock(&share->mutex);
766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814

  if (table->next_number_field)
  {
    KEY *mkey= &table->s->key_info[0]; // We only support one key right now
    update_auto_increment();
    temp_auto= table->next_number_field->val_int();

    /*
      Bad news, this will cause a search for the unique value which is very 
      expensive since we will have to do a table scan which will lock up 
      all other writers during this period. This could perhaps be optimized 
      in the future.
    */
    if (temp_auto == share->auto_increment_value && 
        mkey->flags & HA_NOSAME)
    {
      rc= HA_ERR_FOUND_DUPP_KEY;
      goto error;
    }

    if (temp_auto < share->auto_increment_value && 
        mkey->flags & HA_NOSAME)
    {
      /* 
        First we create a buffer that we can use for reading rows, and can pass
        to get_row().
      */
      if (!(read_buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME))))
      {
        rc= HA_ERR_OUT_OF_MEM;
        goto error;
      }
       /* 
         All of the buffer must be written out or we won't see all of the
         data 
       */
      azflush(&(share->archive_write), Z_SYNC_FLUSH);
      /*
        Set the position of the local read thread to the beginning postion.
      */
      if (read_data_header(&archive))
      {
        rc= HA_ERR_CRASHED_ON_USAGE;
        goto error;
      }

      /*
        Now we read and check all of the rows.
        if (!memcmp(table->next_number_field->ptr, mfield->ptr, mfield->max_length()))
815 816
        if ((longlong)temp_auto == 
            mfield->val_int((char*)(read_buf + mfield->offset())))
817 818 819 820 821
      */
      Field *mfield= table->next_number_field;

      while (!(get_row(&archive, read_buf)))
      {
822 823
        if (!memcmp(read_buf + mfield->offset(), table->next_number_field->ptr,
                    mfield->max_length()))
824 825 826 827 828 829 830 831
        {
          rc= HA_ERR_FOUND_DUPP_KEY;
          goto error;
        }
      }
    }
    else
    {
832 833
      if (temp_auto > share->auto_increment_value)
        auto_increment_value= share->auto_increment_value= temp_auto;
834 835 836 837 838 839 840 841
    }
  }

  /*
    Notice that the global auto_increment has been increased.
    In case of a failed row write, we will never try to reuse the value.
  */

842
  share->rows_recorded++;
unknown's avatar
unknown committed
843
  rc= real_write_row(buf, &(share->archive_write));
844
error:
845
  pthread_mutex_unlock(&share->mutex);
846
  if (read_buf)
847
    my_free((gptr) read_buf, MYF(0));
848

849
  DBUG_RETURN(rc);
850 851
}

852 853 854

ulonglong ha_archive::get_auto_increment()
{
855
  return share->auto_increment_value + 1;
856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886
}

/* Initialized at each key walk (called multiple times unlike rnd_init()) */
int ha_archive::index_init(uint keynr, bool sorted)
{
  DBUG_ENTER("ha_archive::index_init");
  active_index= keynr;
  DBUG_RETURN(0);
}


/*
  No indexes, so if we get a request for an index search since we tell
  the optimizer that we have unique indexes, we scan
*/
int ha_archive::index_read(byte *buf, const byte *key,
                             uint key_len, enum ha_rkey_function find_flag)
{
  int rc;
  DBUG_ENTER("ha_archive::index_read");
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
  DBUG_RETURN(rc);
}


int ha_archive::index_read_idx(byte *buf, uint index, const byte *key,
                                 uint key_len, enum ha_rkey_function find_flag)
{
  int rc= 0;
  bool found= 0;
  KEY *mkey= &table->s->key_info[index];
887 888 889
  current_k_offset= mkey->key_part->offset;
  current_key= key;
  current_key_len= key_len;
890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912


  DBUG_ENTER("ha_archive::index_read_idx");

  /* 
    All of the buffer must be written out or we won't see all of the
    data 
  */
  pthread_mutex_lock(&share->mutex);
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
  pthread_mutex_unlock(&share->mutex);

  /*
    Set the position of the local read thread to the beginning postion.
  */
  if (read_data_header(&archive))
  {
    rc= HA_ERR_CRASHED_ON_USAGE;
    goto error;
  }

  while (!(get_row(&archive, buf)))
  {
913
    if (!memcmp(current_key, buf + current_k_offset, current_key_len))
914 915 916 917 918 919 920 921 922 923 924 925 926
    {
      found= 1;
      break;
    }
  }

  if (found)
    DBUG_RETURN(0);

error:
  DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
}

927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945

int ha_archive::index_next(byte * buf) 
{ 
  bool found= 0;

  DBUG_ENTER("ha_archive::index_next");

  while (!(get_row(&archive, buf)))
  {
    if (!memcmp(current_key, buf+current_k_offset, current_key_len))
    {
      found= 1;
      break;
    }
  }

  DBUG_RETURN(found ? 0 : HA_ERR_END_OF_FILE); 
}

946 947 948 949 950
/*
  All calls that need to scan the table start with this method. If we are told
  that it is a table scan we rewind the file to the beginning, otherwise
  we assume the position will be set.
*/
951

952 953 954
int ha_archive::rnd_init(bool scan)
{
  DBUG_ENTER("ha_archive::rnd_init");
955 956 957
  
  if (share->crashed)
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
958

959
  /* We rewind the file so that we can read from the beginning if scan */
960
  if (scan)
961
  {
962
    scan_rows= share->rows_recorded;
963
    DBUG_PRINT("info", ("archive will retrieve %llu rows", scan_rows));
964 965
    records= 0;

966 967
    /* 
      If dirty, we lock, and then reset/flush the data.
unknown's avatar
unknown committed
968
      I found that just calling azflush() doesn't always work.
969
    */
970
    if (share->dirty == TRUE)
971
    {
972 973 974
      pthread_mutex_lock(&share->mutex);
      if (share->dirty == TRUE)
      {
975
        DBUG_PRINT("info", ("archive flushing out rows for scan"));
unknown's avatar
unknown committed
976
        azflush(&(share->archive_write), Z_SYNC_FLUSH);
977 978 979
        share->dirty= FALSE;
      }
      pthread_mutex_unlock(&share->mutex);
980
    }
981

unknown's avatar
unknown committed
982
    if (read_data_header(&archive))
983
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
984 985
  }

986 987 988 989 990 991 992 993
  DBUG_RETURN(0);
}


/*
  This is the method that is used to read a row. It assumes that the row is 
  positioned where you want it.
*/
unknown's avatar
unknown committed
994
int ha_archive::get_row(azio_stream *file_to_read, byte *buf)
995
{
unknown's avatar
unknown committed
996
  int read; // Bytes read, azread() returns int
997
  uint *ptr, *end;
998 999
  char *last;
  size_t total_blob_length= 0;
1000
  DBUG_ENTER("ha_archive::get_row");
1001

unknown's avatar
unknown committed
1002
  read= azread(file_to_read, buf, table->s->reclength);
1003 1004
  DBUG_PRINT("ha_archive::get_row", ("Read %d bytes expected %d", read, 
                                     table->s->reclength));
1005 1006 1007

  if (read == Z_STREAM_ERROR)
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1008 1009 1010 1011 1012

  /* If we read nothing we are at the end of the file */
  if (read == 0)
    DBUG_RETURN(HA_ERR_END_OF_FILE);

1013 1014 1015
  /* 
    If the record is the wrong size, the file is probably damaged, unless 
    we are dealing with a delayed insert or a bulk insert.
1016
  */
1017
  if ((ulong) read != table->s->reclength)
1018
    DBUG_RETURN(HA_ERR_END_OF_FILE);
1019 1020

  /* Calculate blob length, we use this for our buffer */
1021 1022 1023
  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
       ptr != end ;
       ptr++)
1024 1025 1026 1027
  {
    if (ha_get_bit_in_read_set(((Field_blob*) table->field[*ptr])->fieldnr))
      total_blob_length += ((Field_blob*) table->field[*ptr])->get_length();
  }
1028 1029 1030

  /* Adjust our row buffer if we need be */
  buffer.alloc(total_blob_length);
1031
  last= (char *)buffer.ptr();
1032

1033
  /* Loop through our blobs and read them */
1034 1035 1036
  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
       ptr != end ;
       ptr++)
1037
  {
1038
    size_t size= ((Field_blob*) table->field[*ptr])->get_length();
1039 1040
    if (size)
    {
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052
      if (ha_get_bit_in_read_set(((Field_blob*) table->field[*ptr])->fieldnr))
      {
        read= azread(file_to_read, last, size);
        if ((size_t) read != size)
          DBUG_RETURN(HA_ERR_END_OF_FILE);
        ((Field_blob*) table->field[*ptr])->set_ptr(size, last);
        last += size;
      }
      else
      {
        (void)azseek(file_to_read, size, SEEK_CUR);
      }
1053
    }
1054 1055 1056 1057
  }
  DBUG_RETURN(0);
}

1058

1059 1060 1061 1062
/* 
  Called during ORDER BY. Its position is either from being called sequentially
  or by having had ha_archive::rnd_pos() called before it is called.
*/
1063

1064 1065 1066
int ha_archive::rnd_next(byte *buf)
{
  int rc;
1067
  DBUG_ENTER("ha_archive::rnd_next");
1068

1069 1070 1071
  if (share->crashed)
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

1072 1073 1074 1075
  if (!scan_rows)
    DBUG_RETURN(HA_ERR_END_OF_FILE);
  scan_rows--;

1076 1077
  statistic_increment(table->in_use->status_var.ha_read_rnd_next_count,
		      &LOCK_status);
unknown's avatar
unknown committed
1078 1079
  current_position= aztell(&archive);
  rc= get_row(&archive, buf);
1080 1081


1082
  if (rc != HA_ERR_END_OF_FILE)
1083 1084 1085 1086 1087 1088
    records++;

  DBUG_RETURN(rc);
}


1089
/*
1090 1091 1092 1093
  Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
  each call to ha_archive::rnd_next() if an ordering of the rows is
  needed.
*/
1094

1095 1096 1097
void ha_archive::position(const byte *record)
{
  DBUG_ENTER("ha_archive::position");
1098
  my_store_ptr(ref, ref_length, current_position);
1099 1100 1101 1102 1103
  DBUG_VOID_RETURN;
}


/*
1104 1105 1106 1107
  This is called after a table scan for each row if the results of the
  scan need to be ordered. It will take *pos and use it to move the
  cursor in the file so that the next row that is called is the
  correctly ordered row.
1108
*/
1109

1110 1111 1112
int ha_archive::rnd_pos(byte * buf, byte *pos)
{
  DBUG_ENTER("ha_archive::rnd_pos");
1113 1114
  statistic_increment(table->in_use->status_var.ha_read_rnd_next_count,
		      &LOCK_status);
1115
  current_position= (my_off_t)my_get_ptr(pos, ref_length);
unknown's avatar
unknown committed
1116
  (void)azseek(&archive, current_position, SEEK_SET);
1117

unknown's avatar
unknown committed
1118
  DBUG_RETURN(get_row(&archive, buf));
1119 1120 1121
}

/*
1122
  This method repairs the meta file. It does this by walking the datafile and 
1123 1124
  rewriting the meta file. Currently it does this by calling optimize with
  the extended flag.
1125
*/
1126
int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
1127
{
1128
  DBUG_ENTER("ha_archive::repair");
1129 1130
  check_opt->flags= T_EXTEND;
  int rc= optimize(thd, check_opt);
1131

1132 1133
  if (rc)
    DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
1134

1135
  share->crashed= FALSE;
1136
  DBUG_RETURN(0);
1137 1138
}

1139 1140 1141
/*
  The table can become fragmented if data was inserted, read, and then
  inserted again. What we do is open up the file and recompress it completely. 
1142
*/
1143 1144 1145
int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
{
  DBUG_ENTER("ha_archive::optimize");
1146
  int rc;
unknown's avatar
unknown committed
1147
  azio_stream writer;
1148 1149
  char writer_filename[FN_REFLEN];

1150
  /* Flush any waiting data */
unknown's avatar
unknown committed
1151
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1152

1153
  /* Lets create a file to contain the new data */
1154 1155
  fn_format(writer_filename, share->table_name, "", ARN, 
            MY_REPLACE_EXT|MY_UNPACK_FILENAME);
1156

unknown's avatar
unknown committed
1157
  if (!(azopen(&writer, writer_filename, O_CREAT|O_WRONLY|O_TRUNC|O_BINARY)))
1158 1159 1160 1161 1162 1163 1164 1165
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 

  /* 
    An extended rebuild is a lot more effort. We open up each row and re-record it. 
    Any dead rows are removed (aka rows that may have been partially recorded). 
  */

  if (check_opt->flags == T_EXTEND)
1166
  {
unknown's avatar
unknown committed
1167
    DBUG_PRINT("info", ("archive extended rebuild"));
1168
    byte *buf; 
1169

1170 1171 1172 1173 1174 1175 1176 1177 1178
    /* 
      First we create a buffer that we can use for reading rows, and can pass
      to get_row().
    */
    if (!(buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME))))
    {
      rc= HA_ERR_OUT_OF_MEM;
      goto error;
    }
1179

1180 1181 1182 1183
    /*
      Now we will rewind the archive file so that we are positioned at the 
      start of the file.
    */
unknown's avatar
unknown committed
1184
    rc= read_data_header(&archive);
1185 1186 1187 1188 1189 1190
    
    /*
      Assuming now error from rewinding the archive file, we now write out the 
      new header for out data file.
    */
    if (!rc)
unknown's avatar
unknown committed
1191
      rc= write_data_header(&writer);
1192 1193 1194 1195 1196 1197

    /* 
      On success of writing out the new header, we now fetch each row and
      insert it into the new archive file. 
    */
    if (!rc)
1198 1199
    {
      share->rows_recorded= 0;
1200
      auto_increment_value= share->auto_increment_value= 0;
unknown's avatar
unknown committed
1201
      while (!(rc= get_row(&archive, buf)))
1202
      {
unknown's avatar
unknown committed
1203
        real_write_row(buf, &writer);
1204 1205 1206 1207 1208 1209 1210 1211
        if (table->found_next_number_field)
        {
          Field *field= table->found_next_number_field;
          if (share->auto_increment_value < 
              field->val_int((char*)(buf + field->offset())))
            auto_increment_value= share->auto_increment_value=
              field->val_int((char*)(buf + field->offset()));
        }
1212 1213 1214
        share->rows_recorded++;
      }
    }
1215
    DBUG_PRINT("info", ("recovered %llu archive rows", share->rows_recorded));
1216

unknown's avatar
unknown committed
1217
    my_free((char*)buf, MYF(0));
1218 1219 1220 1221 1222
    if (rc && rc != HA_ERR_END_OF_FILE)
      goto error;
  } 
  else
  {
unknown's avatar
unknown committed
1223
    DBUG_PRINT("info", ("archive quick rebuild"));
1224 1225 1226
    /* 
      The quick method is to just read the data raw, and then compress it directly.
    */
unknown's avatar
unknown committed
1227
    int read; // Bytes read, azread() returns int
1228
    char block[IO_SIZE];
unknown's avatar
unknown committed
1229
    if (azrewind(&archive) == -1)
1230 1231
    {
      rc= HA_ERR_CRASHED_ON_USAGE;
unknown's avatar
unknown committed
1232
      DBUG_PRINT("info", ("archive HA_ERR_CRASHED_ON_USAGE"));
1233 1234 1235
      goto error;
    }

unknown's avatar
unknown committed
1236 1237
    while ((read= azread(&archive, block, IO_SIZE)))
      azwrite(&writer, block, read);
1238 1239
  }

1240
  azflush(&writer, Z_SYNC_FLUSH);
1241
  share->dirty= FALSE;
1242
  azclose(share->archive_write);
1243
  share->archive_write= writer; 
1244 1245 1246

  my_rename(writer_filename,share->data_file_name,MYF(0));

1247 1248 1249
  /*
    Now we need to reopen our read descriptor since it has changed.
  */
1250 1251
  azclose(&archive);
  if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
1252 1253 1254 1255 1256 1257
  {
    rc= HA_ERR_CRASHED_ON_USAGE;
    goto error;
  }


1258 1259
  DBUG_RETURN(0); 

1260
error:
unknown's avatar
unknown committed
1261
  azclose(&writer);
1262 1263 1264

  DBUG_RETURN(rc); 
}
1265 1266 1267 1268 1269 1270 1271 1272

/* 
  Below is an example of how to setup row level locking.
*/
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
                                       THR_LOCK_DATA **to,
                                       enum thr_lock_type lock_type)
{
1273 1274 1275 1276 1277
  if (lock_type == TL_WRITE_DELAYED)
    delayed_insert= TRUE;
  else
    delayed_insert= FALSE;

1278 1279
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) 
  {
1280 1281 1282 1283 1284 1285 1286 1287 1288
    /* 
      Here is where we get into the guts of a row level lock.
      If TL_UNLOCK is set 
      If we are not doing a LOCK TABLE or DISCARD/IMPORT
      TABLESPACE, then allow multiple writers 
    */

    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
         lock_type <= TL_WRITE) && !thd->in_lock_tables
1289
        && !thd->tablespace_op)
1290 1291 1292 1293 1294 1295 1296 1297 1298 1299
      lock_type = TL_WRITE_ALLOW_WRITE;

    /* 
      In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
      MySQL would use the lock TL_READ_NO_INSERT on t2, and that
      would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
      to t2. Convert the lock to a normal read lock to allow
      concurrent inserts to t2. 
    */

1300
    if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables) 
1301 1302 1303 1304 1305 1306 1307 1308 1309 1310
      lock_type = TL_READ;

    lock.type=lock_type;
  }

  *to++= &lock;

  return to;
}

1311 1312 1313 1314 1315 1316 1317 1318 1319
void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
{
  ha_archive::info(HA_STATUS_AUTO | HA_STATUS_CONST);
  if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
  {
    create_info->auto_increment_value=auto_increment_value;
  }
}

1320 1321 1322 1323

/*
  Hints for optimizer, see ha_tina for more information
*/
1324 1325 1326
void ha_archive::info(uint flag)
{
  DBUG_ENTER("ha_archive::info");
1327 1328 1329 1330
  /* 
    This should be an accurate number now, though bulk and delayed inserts can
    cause the number to be inaccurate.
  */
1331 1332
  records= share->rows_recorded;
  deleted= 0;
1333 1334 1335 1336 1337 1338 1339
  /* Costs quite a bit more to get all information */
  if (flag & HA_STATUS_TIME)
  {
    MY_STAT file_stat;  // Stat information for the data file

    VOID(my_stat(share->data_file_name, &file_stat, MYF(MY_WME)));

1340
    mean_rec_length= table->s->reclength + buffer.alloced_length();
unknown's avatar
unknown committed
1341
    data_file_length= file_stat.st_size;
1342 1343
    create_time= file_stat.st_ctime;
    update_time= file_stat.st_mtime;
unknown's avatar
unknown committed
1344
    max_data_file_length= share->rows_recorded * mean_rec_length;
1345 1346 1347
  }
  delete_length= 0;
  index_file_length=0;
1348

1349 1350 1351
  if (flag & HA_STATUS_AUTO)
    auto_increment_value= share->auto_increment_value;

1352 1353
  DBUG_VOID_RETURN;
}
1354 1355 1356 1357 1358 1359 1360 1361 1362 1363


/*
  This method tells us that a bulk insert operation is about to occur. We set
  a flag which will keep write_row from saying that its data is dirty. This in
  turn will keep selects from causing a sync to occur.
  Basically, yet another optimizations to keep compression working well.
*/
void ha_archive::start_bulk_insert(ha_rows rows)
{
1364
  DBUG_ENTER("ha_archive::start_bulk_insert");
1365 1366
  if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
    bulk_insert= TRUE;
1367 1368 1369 1370 1371 1372 1373 1374 1375 1376
  DBUG_VOID_RETURN;
}


/* 
  Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
  flag, and set the share dirty so that the next select will call sync for us.
*/
int ha_archive::end_bulk_insert()
{
1377
  DBUG_ENTER("ha_archive::end_bulk_insert");
1378 1379 1380 1381
  bulk_insert= FALSE;
  share->dirty= TRUE;
  DBUG_RETURN(0);
}
1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392

/*
  We cancel a truncate command. The only way to delete an archive table is to drop it.
  This is done for security reasons. In a later version we will enable this by 
  allowing the user to select a different row format.
*/
int ha_archive::delete_all_rows()
{
  DBUG_ENTER("ha_archive::delete_all_rows");
  DBUG_RETURN(0);
}
1393 1394 1395 1396 1397 1398

/*
  We just return state if asked.
*/
bool ha_archive::is_crashed() const 
{
1399 1400
  DBUG_ENTER("ha_archive::is_crashed");
  DBUG_RETURN(share->crashed); 
1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416
}

/*
  Simple scan of the tables to make sure everything is ok.
*/

int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
{
  int rc= 0;
  byte *buf; 
  const char *old_proc_info=thd->proc_info;
  ha_rows count= share->rows_recorded;
  DBUG_ENTER("ha_archive::check");

  thd->proc_info= "Checking table";
  /* Flush any waiting data */
unknown's avatar
unknown committed
1417
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430

  /* 
    First we create a buffer that we can use for reading rows, and can pass
    to get_row().
  */
  if (!(buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME))))
    rc= HA_ERR_OUT_OF_MEM;

  /*
    Now we will rewind the archive file so that we are positioned at the 
    start of the file.
  */
  if (!rc)
unknown's avatar
unknown committed
1431
    read_data_header(&archive);
1432 1433

  if (!rc)
unknown's avatar
unknown committed
1434
    while (!(rc= get_row(&archive, buf)))
1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461
      count--;

  my_free((char*)buf, MYF(0));

  thd->proc_info= old_proc_info;

  if ((rc && rc != HA_ERR_END_OF_FILE) || count)  
  {
    share->crashed= FALSE;
    DBUG_RETURN(HA_ADMIN_CORRUPT);
  }
  else
  {
    DBUG_RETURN(HA_ADMIN_OK);
  }
}

/*
  Check and repair the table if needed.
*/
bool ha_archive::check_and_repair(THD *thd) 
{
  HA_CHECK_OPT check_opt;
  DBUG_ENTER("ha_archive::check_and_repair");

  check_opt.init();

1462
  DBUG_RETURN(repair(thd, &check_opt));
1463
}