Commit 4e8d92e5 authored by brian@zim.(none)'s avatar brian@zim.(none)

Final cleanup for new archive internal format. All new headers work.

parent ab1a97f5
...@@ -26,12 +26,15 @@ int main(int argc, char *argv[]) ...@@ -26,12 +26,15 @@ int main(int argc, char *argv[])
printf("Version :%u\n", reader_handle.version); printf("Version :%u\n", reader_handle.version);
printf("Start position :%llu\n", (unsigned long long)reader_handle.start); printf("Start position :%llu\n", (unsigned long long)reader_handle.start);
printf("Block size :%u\n", reader_handle.block_size); if (reader_handle.version > 2)
printf("Rows: %llu\n", reader_handle.rows); {
printf("Autoincrement: %llu\n", reader_handle.auto_increment); printf("Block size :%u\n", reader_handle.block_size);
printf("Check Point: %llu\n", reader_handle.check_point); printf("Rows: %llu\n", reader_handle.rows);
printf("Forced Flushes: %llu\n", reader_handle.forced_flushes); printf("Autoincrement: %llu\n", reader_handle.auto_increment);
printf("State: %s\n", ( reader_handle.dirty ? "dirty" : "clean")); printf("Check Point: %llu\n", reader_handle.check_point);
printf("Forced Flushes: %llu\n", reader_handle.forced_flushes);
printf("State: %s\n", ( reader_handle.dirty ? "dirty" : "clean"));
}
azclose(&reader_handle); azclose(&reader_handle);
......
...@@ -49,7 +49,7 @@ void read_header(azio_stream *s, unsigned char *buffer); ...@@ -49,7 +49,7 @@ void read_header(azio_stream *s, unsigned char *buffer);
int az_open (azio_stream *s, const char *path, int Flags, File fd) int az_open (azio_stream *s, const char *path, int Flags, File fd)
{ {
int err; int err;
int level = Z_NO_COMPRESSION; /* Z_DEFAULT_COMPRESSION;*/ /* compression level */ int level = Z_DEFAULT_COMPRESSION; /* compression level */
int strategy = Z_DEFAULT_STRATEGY; /* compression strategy */ int strategy = Z_DEFAULT_STRATEGY; /* compression strategy */
s->stream.zalloc = (alloc_func)0; s->stream.zalloc = (alloc_func)0;
...@@ -165,7 +165,6 @@ void write_header(azio_stream *s) ...@@ -165,7 +165,6 @@ void write_header(azio_stream *s)
int4store(ptr + AZ_FRM_POS, 0); /* FRM Block */ int4store(ptr + AZ_FRM_POS, 0); /* FRM Block */
int4store(ptr + AZ_META_POS, 0); /* Meta Block */ int4store(ptr + AZ_META_POS, 0); /* Meta Block */
int8store(ptr + AZ_START_POS, (unsigned long long)s->start); /* Start of Data Block Index Block */ int8store(ptr + AZ_START_POS, (unsigned long long)s->start); /* Start of Data Block Index Block */
printf("ROWS %llu\n", s->rows);
int8store(ptr + AZ_ROW_POS, (unsigned long long)s->rows); /* Start of Data Block Index Block */ int8store(ptr + AZ_ROW_POS, (unsigned long long)s->rows); /* Start of Data Block Index Block */
int8store(ptr + AZ_FLUSH_POS, (unsigned long long)s->forced_flushes); /* Start of Data Block Index Block */ int8store(ptr + AZ_FLUSH_POS, (unsigned long long)s->forced_flushes); /* Start of Data Block Index Block */
int8store(ptr + AZ_CHECK_POS, (unsigned long long)s->check_point); /* Start of Data Block Index Block */ int8store(ptr + AZ_CHECK_POS, (unsigned long long)s->check_point); /* Start of Data Block Index Block */
......
...@@ -460,7 +460,8 @@ int ha_archive::open(const char *name, int mode, uint open_options) ...@@ -460,7 +460,8 @@ int ha_archive::open(const char *name, int mode, uint open_options)
DBUG_ASSERT(share); DBUG_ASSERT(share);
record_buffer= create_record_buffer(table->s->reclength); record_buffer= create_record_buffer(table->s->reclength +
ARCHIVE_ROW_HEADER_SIZE);
if (!record_buffer) if (!record_buffer)
{ {
...@@ -642,9 +643,12 @@ int ha_archive::real_write_row(byte *buf, azio_stream *writer) ...@@ -642,9 +643,12 @@ int ha_archive::real_write_row(byte *buf, azio_stream *writer)
} }
/* Calculate max length needed for row */ /*
Calculate max length needed for row. This includes
the bytes required for the length in the header.
*/
int ha_archive::max_row_length(const byte *buf) uint32 ha_archive::max_row_length(const byte *buf)
{ {
ulonglong length= table->s->reclength + table->s->fields*2; ulonglong length= table->s->reclength + table->s->fields*2;
length+= ARCHIVE_ROW_HEADER_SIZE; length+= ARCHIVE_ROW_HEADER_SIZE;
...@@ -654,26 +658,23 @@ int ha_archive::max_row_length(const byte *buf) ...@@ -654,26 +658,23 @@ int ha_archive::max_row_length(const byte *buf)
ptr != end ; ptr != end ;
ptr++) ptr++)
{ {
Field_blob *blob= ((Field_blob*) table->field[*ptr]); length += 2 + ((Field_blob*) table->field[*ptr])->get_length();
length+= blob->get_length((char*) buf + blob->offset())+2;
} }
return length; return length;
} }
unsigned int ha_archive::pack_row(const byte *record) unsigned int ha_archive::pack_row(byte *record)
{ {
byte *ptr; byte *ptr;
ulonglong full_length;
DBUG_ENTER("ha_archive::pack_row"); DBUG_ENTER("ha_archive::pack_row");
if (table->s->blob_fields)
{
if (fix_rec_buff(max_row_length(record)))
DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
}
if (fix_rec_buff(max_row_length(record)))
DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
/* Copy null bits */ /* Copy null bits */
memcpy(record_buffer->buffer+ARCHIVE_ROW_HEADER_SIZE, memcpy(record_buffer->buffer+ARCHIVE_ROW_HEADER_SIZE,
...@@ -681,8 +682,10 @@ unsigned int ha_archive::pack_row(const byte *record) ...@@ -681,8 +682,10 @@ unsigned int ha_archive::pack_row(const byte *record)
ptr= record_buffer->buffer + table->s->null_bytes + ARCHIVE_ROW_HEADER_SIZE; ptr= record_buffer->buffer + table->s->null_bytes + ARCHIVE_ROW_HEADER_SIZE;
for (Field **field=table->field ; *field ; field++) for (Field **field=table->field ; *field ; field++)
{
ptr=(byte*) (*field)->pack((char*) ptr, ptr=(byte*) (*field)->pack((char*) ptr,
(char*) record + (*field)->offset()); (char*) record + (*field)->offset(record));
}
int4store(record_buffer->buffer, (int)(ptr - record_buffer->buffer - int4store(record_buffer->buffer, (int)(ptr - record_buffer->buffer -
ARCHIVE_ROW_HEADER_SIZE)); ARCHIVE_ROW_HEADER_SIZE));
...@@ -715,12 +718,16 @@ int ha_archive::write_row(byte *buf) ...@@ -715,12 +718,16 @@ int ha_archive::write_row(byte *buf)
if (share->crashed) if (share->crashed)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
if (!share->archive_write_open)
if (init_archive_writer())
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
ha_statistic_increment(&SSV::ha_write_count); ha_statistic_increment(&SSV::ha_write_count);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
table->timestamp_field->set_time(); table->timestamp_field->set_time();
pthread_mutex_lock(&share->mutex); pthread_mutex_lock(&share->mutex);
if (table->next_number_field) if (table->next_number_field && record == table->record[0])
{ {
KEY *mkey= &table->s->key_info[0]; // We only support one key right now KEY *mkey= &table->s->key_info[0]; // We only support one key right now
update_auto_increment(); update_auto_increment();
...@@ -736,7 +743,6 @@ int ha_archive::write_row(byte *buf) ...@@ -736,7 +743,6 @@ int ha_archive::write_row(byte *buf)
rc= HA_ERR_FOUND_DUPP_KEY; rc= HA_ERR_FOUND_DUPP_KEY;
goto error; goto error;
} }
/* /*
Bad news, this will cause a search for the unique value which is very Bad news, this will cause a search for the unique value which is very
expensive since we will have to do a table scan which will lock up expensive since we will have to do a table scan which will lock up
...@@ -785,7 +791,8 @@ int ha_archive::write_row(byte *buf) ...@@ -785,7 +791,8 @@ int ha_archive::write_row(byte *buf)
else else
{ {
if (temp_auto > share->archive_write.auto_increment) if (temp_auto > share->archive_write.auto_increment)
stats.auto_increment_value= share->archive_write.auto_increment= temp_auto; stats.auto_increment_value= share->archive_write.auto_increment=
temp_auto;
} }
} }
...@@ -793,10 +800,6 @@ int ha_archive::write_row(byte *buf) ...@@ -793,10 +800,6 @@ int ha_archive::write_row(byte *buf)
Notice that the global auto_increment has been increased. Notice that the global auto_increment has been increased.
In case of a failed row write, we will never try to reuse the value. In case of a failed row write, we will never try to reuse the value.
*/ */
if (!share->archive_write_open)
if (init_archive_writer())
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
share->rows_recorded++; share->rows_recorded++;
rc= real_write_row(buf, &(share->archive_write)); rc= real_write_row(buf, &(share->archive_write));
error: error:
...@@ -972,20 +975,27 @@ int ha_archive::get_row(azio_stream *file_to_read, byte *buf) ...@@ -972,20 +975,27 @@ int ha_archive::get_row(azio_stream *file_to_read, byte *buf)
} }
/* Reallocate buffer if needed */ /* Reallocate buffer if needed */
bool ha_archive::fix_rec_buff(int length) bool ha_archive::fix_rec_buff(unsigned int length)
{ {
if (! record_buffer->buffer || DBUG_ENTER("ha_archive::fix_rec_buff");
length > (record_buffer->length + ARCHIVE_ROW_HEADER_SIZE)) DBUG_PRINT("ha_archive", ("Fixing %u for %u",
length, record_buffer->length));
DBUG_ASSERT(record_buffer->buffer);
if (length > record_buffer->length);
{ {
byte *newptr; byte *newptr;
if (!(newptr=(byte*) my_realloc((gptr) record_buffer->buffer, if (!(newptr=(byte*) my_realloc((gptr) record_buffer->buffer,
length + ARCHIVE_ROW_HEADER_SIZE, length,
MYF(MY_ALLOW_ZERO_PTR)))) MYF(MY_ALLOW_ZERO_PTR))))
return 1; /* purecov: inspected */ DBUG_RETURN(1);
record_buffer->buffer= newptr; record_buffer->buffer= newptr;
record_buffer->length= length; record_buffer->length= length;
} }
return 0;
DBUG_ASSERT(length <= record_buffer->length);
DBUG_RETURN(0);
} }
int ha_archive::unpack_row(azio_stream *file_to_read, char *record) int ha_archive::unpack_row(azio_stream *file_to_read, char *record)
...@@ -1011,6 +1021,7 @@ int ha_archive::unpack_row(azio_stream *file_to_read, char *record) ...@@ -1011,6 +1021,7 @@ int ha_archive::unpack_row(azio_stream *file_to_read, char *record)
DBUG_PRINT("ha_archive",("Unpack row length %u -> %u", row_len, DBUG_PRINT("ha_archive",("Unpack row length %u -> %u", row_len,
(unsigned int)table->s->reclength)); (unsigned int)table->s->reclength));
fix_rec_buff(row_len); fix_rec_buff(row_len);
DBUG_ASSERT(row_len <= record_buffer->length);
read= azread(file_to_read, record_buffer->buffer, row_len, &error); read= azread(file_to_read, record_buffer->buffer, row_len, &error);
...@@ -1026,7 +1037,7 @@ int ha_archive::unpack_row(azio_stream *file_to_read, char *record) ...@@ -1026,7 +1037,7 @@ int ha_archive::unpack_row(azio_stream *file_to_read, char *record)
memcpy(record, ptr, table->s->null_bytes); memcpy(record, ptr, table->s->null_bytes);
ptr+= table->s->null_bytes; ptr+= table->s->null_bytes;
for (Field **field=table->field ; *field ; field++) for (Field **field=table->field ; *field ; field++)
ptr= (*field)->unpack(record + (*field)->offset(), ptr); ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1052,6 +1063,10 @@ int ha_archive::get_row_version2(azio_stream *file_to_read, byte *buf) ...@@ -1052,6 +1063,10 @@ int ha_archive::get_row_version2(azio_stream *file_to_read, byte *buf)
read= azread(file_to_read, buf, table->s->reclength, &error); read= azread(file_to_read, buf, table->s->reclength, &error);
/* If we read nothing we are at the end of the file */
if (read == 0)
DBUG_RETURN(HA_ERR_END_OF_FILE);
if (read != table->s->reclength) if (read != table->s->reclength)
{ {
DBUG_PRINT("ha_archive::get_row_version2", ("Read %u bytes expected %u", DBUG_PRINT("ha_archive::get_row_version2", ("Read %u bytes expected %u",
...@@ -1063,10 +1078,6 @@ int ha_archive::get_row_version2(azio_stream *file_to_read, byte *buf) ...@@ -1063,10 +1078,6 @@ int ha_archive::get_row_version2(azio_stream *file_to_read, byte *buf)
if (error == Z_STREAM_ERROR || error == Z_DATA_ERROR ) if (error == Z_STREAM_ERROR || error == Z_DATA_ERROR )
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
/* If we read nothing we are at the end of the file */
if (read == 0)
DBUG_RETURN(HA_ERR_END_OF_FILE);
/* /*
If the record is the wrong size, the file is probably damaged, unless If the record is the wrong size, the file is probably damaged, unless
we are dealing with a delayed insert or a bulk insert. we are dealing with a delayed insert or a bulk insert.
...@@ -1209,15 +1220,9 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1209,15 +1220,9 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
char writer_filename[FN_REFLEN]; char writer_filename[FN_REFLEN];
/* Open up the writer if we haven't yet */ /* Open up the writer if we haven't yet */
/* Flush any waiting data */
if (share->archive_write_open) if (share->archive_write_open)
{
/* Flush any waiting data */
azflush(&(share->archive_write), Z_SYNC_FLUSH); azflush(&(share->archive_write), Z_SYNC_FLUSH);
}
else
{
init_archive_writer();
}
/* Lets create a file to contain the new data */ /* Lets create a file to contain the new data */
fn_format(writer_filename, share->table_name, "", ARN, fn_format(writer_filename, share->table_name, "", ARN,
...@@ -1236,29 +1241,6 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1236,29 +1241,6 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
if (1) if (1)
{ {
DBUG_PRINT("ha_archive", ("archive extended rebuild")); DBUG_PRINT("ha_archive", ("archive extended rebuild"));
byte *buf;
archive_record_buffer *write_buffer, *read_buffer, *original_buffer;
original_buffer= record_buffer;
/*
First we create a buffer that we can use for reading rows, and can pass
to get_row().
*/
if (!(buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME))))
{
rc= HA_ERR_OUT_OF_MEM;
goto error;
}
read_buffer= create_record_buffer(record_buffer->length);
write_buffer= create_record_buffer(record_buffer->length);
if (!write_buffer || !read_buffer)
{
rc= HA_ERR_OUT_OF_MEM;
goto error;
}
/* /*
Now we will rewind the archive file so that we are positioned at the Now we will rewind the archive file so that we are positioned at the
...@@ -1274,12 +1256,11 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1274,12 +1256,11 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
{ {
share->rows_recorded= 0; share->rows_recorded= 0;
stats.auto_increment_value= share->archive_write.auto_increment= 0; stats.auto_increment_value= share->archive_write.auto_increment= 0;
record_buffer= read_buffer; my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set);
while (!(rc= get_row(&archive, buf))) while (!(rc= get_row(&archive, table->record[0])))
{ {
record_buffer= write_buffer; real_write_row(table->record[0], &writer);
real_write_row(buf, &writer);
/* /*
Long term it should be possible to optimize this so that Long term it should be possible to optimize this so that
it is not called on each row. it is not called on each row.
...@@ -1288,31 +1269,24 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1288,31 +1269,24 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
{ {
Field *field= table->found_next_number_field; Field *field= table->found_next_number_field;
ulonglong auto_value= ulonglong auto_value=
(ulonglong) field->val_int((char*)(buf + (ulonglong) field->val_int((char*)(table->record[0] +
field->offset(table->record[0]))); field->offset(table->record[0])));
if (share->archive_write.auto_increment < auto_value) if (share->archive_write.auto_increment < auto_value)
stats.auto_increment_value= share->archive_write.auto_increment= stats.auto_increment_value= share->archive_write.auto_increment=
auto_value; auto_value;
} }
record_buffer= read_buffer;
} }
share->rows_recorded= archive.rows; dbug_tmp_restore_column_map(table->read_set, org_bitmap);
stats.auto_increment_value= share->archive_write.auto_increment= share->rows_recorded= writer.rows;
writer.auto_increment= archive.auto_increment;
DBUG_PRINT("ha_archive", ("auto to save %llu", writer.auto_increment));
} }
DBUG_PRINT("info", ("recovered %llu archive rows", DBUG_PRINT("info", ("recovered %llu archive rows",
(unsigned long long)share->rows_recorded)); (unsigned long long)share->rows_recorded));
DBUG_PRINT("ha_archive", ("recovered %llu archive rows", DBUG_PRINT("ha_archive", ("recovered %llu archive rows",
(unsigned long long)share->rows_recorded)); (unsigned long long)share->rows_recorded));
record_buffer= original_buffer;
destroy_record_buffer(read_buffer);
destroy_record_buffer(write_buffer);
my_free((char*)buf, MYF(0));
if (rc && rc != HA_ERR_END_OF_FILE) if (rc && rc != HA_ERR_END_OF_FILE)
goto error; goto error;
} }
...@@ -1322,29 +1296,16 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1322,29 +1296,16 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
// now we close both our writer and our reader for the rename // now we close both our writer and our reader for the rename
azclose(&(share->archive_write)); azclose(&(share->archive_write));
share->archive_write_open= 0;
azclose(&archive); azclose(&archive);
// make the file we just wrote be our data file // make the file we just wrote be our data file
rc = my_rename(writer_filename,share->data_file_name,MYF(0)); rc = my_rename(writer_filename,share->data_file_name,MYF(0));
/*
now open the shared writer back up
we don't check rc here because we want to open the file back up even
if the optimize failed but we will return rc below so that we will
know it failed.
We also need to reopen our read descriptor since it has changed.
*/
DBUG_PRINT("ha_archive", ("Reopening archive data file"));
if (!azopen(&(share->archive_write), share->data_file_name,
O_RDWR|O_BINARY) ||
!azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY))
{
DBUG_PRINT("ha_archive", ("Could not open archive write file"));
rc= HA_ERR_CRASHED_ON_USAGE;
}
DBUG_RETURN(rc); DBUG_RETURN(rc);
error: error:
DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
azclose(&writer); azclose(&writer);
DBUG_RETURN(rc); DBUG_RETURN(rc);
...@@ -1437,10 +1398,11 @@ int ha_archive::info(uint flag) ...@@ -1437,10 +1398,11 @@ int ha_archive::info(uint flag)
stats.delete_length= 0; stats.delete_length= 0;
stats.index_file_length=0; stats.index_file_length=0;
/*
if (flag & HA_STATUS_AUTO) if (flag & HA_STATUS_AUTO)
stats.auto_increment_value= share->archive_write.auto_increment; {
*/ azflush(&archive, Z_SYNC_FLUSH);
stats.auto_increment_value= archive.auto_increment;
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1555,7 +1517,7 @@ bool ha_archive::check_and_repair(THD *thd) ...@@ -1555,7 +1517,7 @@ bool ha_archive::check_and_repair(THD *thd)
DBUG_RETURN(repair(thd, &check_opt)); DBUG_RETURN(repair(thd, &check_opt));
} }
archive_record_buffer *ha_archive::create_record_buffer(ulonglong length) archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
{ {
DBUG_ENTER("ha_archive::create_record_buffer"); DBUG_ENTER("ha_archive::create_record_buffer");
archive_record_buffer *r; archive_record_buffer *r;
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
typedef struct st_archive_record_buffer { typedef struct st_archive_record_buffer {
byte *buffer; byte *buffer;
int length; uint32 length;
} archive_record_buffer; } archive_record_buffer;
...@@ -71,7 +71,7 @@ class ha_archive: public handler ...@@ -71,7 +71,7 @@ class ha_archive: public handler
uint current_k_offset; uint current_k_offset;
archive_record_buffer *record_buffer; archive_record_buffer *record_buffer;
archive_record_buffer *create_record_buffer(ulonglong length); archive_record_buffer *create_record_buffer(unsigned int length);
void destroy_record_buffer(archive_record_buffer *r); void destroy_record_buffer(archive_record_buffer *r);
public: public:
...@@ -137,9 +137,9 @@ public: ...@@ -137,9 +137,9 @@ public:
bool is_crashed() const; bool is_crashed() const;
int check(THD* thd, HA_CHECK_OPT* check_opt); int check(THD* thd, HA_CHECK_OPT* check_opt);
bool check_and_repair(THD *thd); bool check_and_repair(THD *thd);
int max_row_length(const byte *buf); uint32 max_row_length(const byte *buf);
bool fix_rec_buff(int length); bool fix_rec_buff(unsigned int length);
int unpack_row(azio_stream *file_to_read, char *record); int unpack_row(azio_stream *file_to_read, char *record);
unsigned int pack_row(const byte *record); unsigned int pack_row(byte *record);
}; };
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment