Commit 06b5e9bb authored by mats@romeo.(none)'s avatar mats@romeo.(none)

BUG#27441 (There is no COLS bitmap for the after image of an update

rows event):

Adding a after image COLS bitmap to Update_rows_log_event (for telling
what columns that are present in the after image of each row update).

Also fixing case where Rows_log_event length was not correctly computed
(happened when the number of columns in a table was more than 251). 
parent 6269e4f2
......@@ -39,7 +39,7 @@ Replicate_Wild_Ignore_Table
Last_Errno 1146
Last_Error Error 'Table 'test.t1' doesn't exist' on opening table `test`.`t1`
Skip_Counter 0
Exec_Master_Log_Pos 522
Exec_Master_Log_Pos 523
Relay_Log_Space #
Until_Condition None
Until_Log_File
......
......@@ -5604,7 +5604,10 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
}
else
m_cols.bitmap= 0; // to not free it
{
// Needed because bitmap_init() does not set it to null on failure
m_cols.bitmap= 0;
}
}
#endif
......@@ -5641,14 +5644,57 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
m_flags= uint2korr(post_start);
byte const *const var_start= (const byte *)buf + common_header_len +
post_header_len;
byte const *const var_start=
(const byte *)buf + common_header_len + post_header_len;
byte const *const ptr_width= var_start;
uchar *ptr_after_width= (uchar*) ptr_width;
DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
m_width = net_field_length(&ptr_after_width);
DBUG_PRINT("debug", ("m_width=%lu", m_width));
/* if bitmap_init fails, catched in is_valid() */
if (likely(!bitmap_init(&m_cols,
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
(m_width + 7) & ~7UL,
false)))
{
DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
ptr_after_width+= (m_width + 7) / 8;
DBUG_DUMP("m_cols", (char*) m_cols.bitmap, no_bytes_in_map(&m_cols));
}
else
{
// Needed because bitmap_init() does not set it to null on failure
m_cols.bitmap= NULL;
DBUG_VOID_RETURN;
}
const uint byte_count= (m_width + 7) / 8;
const byte* const ptr_rows_data= var_start + byte_count + 1;
m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */
if (event_type == UPDATE_ROWS_EVENT)
{
DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
/* if bitmap_init fails, catched in is_valid() */
if (likely(!bitmap_init(&m_cols_ai,
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
(m_width + 7) & ~7UL,
false)))
{
DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
memcpy(m_cols_ai.bitmap, ptr_after_width, (m_width + 7) / 8);
ptr_after_width+= (m_width + 7) / 8;
DBUG_DUMP("m_cols_ai", (char*) m_cols_ai.bitmap, no_bytes_in_map(&m_cols_ai));
}
else
{
// Needed because bitmap_init() does not set it to null on failure
m_cols_ai.bitmap= 0;
DBUG_VOID_RETURN;
}
}
const byte* const ptr_rows_data= (const byte*) ptr_after_width;
my_size_t const data_size= event_len - (ptr_rows_data - (const byte *) buf);
DBUG_PRINT("info",("m_table_id: %lu m_flags: %d m_width: %lu data_size: %lu",
......@@ -5657,12 +5703,6 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
m_rows_buf= (byte*)my_malloc(data_size, MYF(MY_WME));
if (likely((bool)m_rows_buf))
{
/* if bitmap_init fails, catched in is_valid() */
if (likely(!bitmap_init(&m_cols,
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
(m_width + 7) & ~7UL,
false)))
memcpy(m_cols.bitmap, ptr_after_width, byte_count);
m_rows_end= m_rows_buf + data_size;
m_rows_cur= m_rows_end;
memcpy(m_rows_buf, ptr_rows_data, data_size);
......@@ -5681,6 +5721,29 @@ Rows_log_event::~Rows_log_event()
my_free((gptr)m_rows_buf, MYF(MY_ALLOW_ZERO_PTR));
}
int Rows_log_event::get_data_size()
{
int const type_code= get_type_code();
char buf[sizeof(m_width)+1];
char *end= net_store_length(buf, (m_width + 7) / 8);
DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
return 6 + no_bytes_in_map(&m_cols) + (end - buf) +
(type_code == UPDATE_ROWS_EVENT ? no_bytes_in_map(&m_cols_ai) : 0) +
(m_rows_cur - m_rows_buf););
int data_size= ROWS_HEADER_LEN;
data_size+= no_bytes_in_map(&m_cols);
data_size+= end - buf;
if (type_code == UPDATE_ROWS_EVENT)
data_size+= no_bytes_in_map(&m_cols_ai);
data_size+= (m_rows_cur - m_rows_buf);
return data_size;
}
#ifndef MYSQL_CLIENT
int Rows_log_event::do_add_row_data(byte *const row_data,
my_size_t const length)
......@@ -5911,7 +5974,7 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
If m_table_id == ~0UL, then we have a dummy event that does not
contain any data. In that case, we just remove all tables in the
tables_to_lock list, close the thread tables, and return with
success. The relay log position will be stepped in
success.
*/
if (m_table_id == ~0UL)
{
......@@ -6291,15 +6354,35 @@ bool Rows_log_event::write_data_body(IO_CACHE*file)
*/
char sbuf[sizeof(m_width)];
my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
bool res= false;
char *const sbuf_end= net_store_length((char*) sbuf, (uint) m_width);
DBUG_ASSERT(static_cast<my_size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
return (my_b_safe_write(file, reinterpret_cast<byte*>(sbuf),
sbuf_end - sbuf) ||
my_b_safe_write(file, reinterpret_cast<byte*>(m_cols.bitmap),
no_bytes_in_map(&m_cols)) ||
my_b_safe_write(file, m_rows_buf, (uint) data_size));
DBUG_DUMP("m_width", sbuf, sbuf_end - sbuf);
res= res || my_b_safe_write(file,
reinterpret_cast<byte*>(sbuf),
sbuf_end - sbuf);
DBUG_DUMP("m_cols", (char*) m_cols.bitmap, no_bytes_in_map(&m_cols));
res= res || my_b_safe_write(file,
reinterpret_cast<byte*>(m_cols.bitmap),
no_bytes_in_map(&m_cols));
/*
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
*/
if (get_type_code() == UPDATE_ROWS_EVENT)
{
DBUG_DUMP("m_cols_ai", (char*) m_cols_ai.bitmap, no_bytes_in_map(&m_cols_ai));
res= res || my_b_safe_write(file,
reinterpret_cast<byte*>(m_cols_ai.bitmap),
no_bytes_in_map(&m_cols_ai));
}
DBUG_DUMP("rows", m_rows_buf, data_size);
res= res || my_b_safe_write(file, m_rows_buf, (uint) data_size);
return res;
}
#endif
......@@ -7584,16 +7667,55 @@ void Delete_rows_log_event::print(FILE *file,
*/
#if !defined(MYSQL_CLIENT)
Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
ulong tid, MY_BITMAP const *cols,
ulong tid,
MY_BITMAP const *cols_bi,
MY_BITMAP const *cols_ai,
bool is_transactional)
: Rows_log_event(thd_arg, tbl_arg, tid, cols_bi, is_transactional)
#ifdef HAVE_REPLICATION
, m_memory(NULL), m_key(NULL)
#endif
{
init(cols_ai);
}
Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
ulong tid,
MY_BITMAP const *cols,
bool is_transactional)
: Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional)
#ifdef HAVE_REPLICATION
, m_memory(NULL), m_key(NULL)
#endif
{
init(cols);
}
void Update_rows_log_event::init(MY_BITMAP const *cols)
{
/* if bitmap_init fails, catched in is_valid() */
if (likely(!bitmap_init(&m_cols_ai,
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
(m_width + 7) & ~7UL,
false)))
{
/* Cols can be zero if this is a dummy binrows event */
if (likely(cols != NULL))
memcpy(m_cols_ai.bitmap, cols->bitmap, no_bytes_in_map(cols));
}
}
#endif /* !defined(MYSQL_CLIENT) */
Update_rows_log_event::~Update_rows_log_event()
{
if (m_cols_ai.bitmap == m_bitbuf_ai) // no my_malloc happened
m_cols_ai.bitmap= 0; // so no my_free in bitmap_free
bitmap_free(&m_cols_ai); // To pair with bitmap_init().
}
/*
Constructor used by slave to read the event from the binary log.
*/
......@@ -7678,7 +7800,7 @@ int Update_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO const *rli,
store_record(table, record[1]);
char const *next_start = *row_end;
/* m_after_image is the after image for the update */
error= unpack_row(rli, table, m_width, next_start, &m_cols, row_end,
error= unpack_row(rli, table, m_width, next_start, &m_cols_ai, row_end,
&m_master_reclength, table->write_set, UPDATE_ROWS_EVENT);
bmove_align(m_after_image, table->record[0], table->s->reclength);
restore_record(table, record[1]);
......
......@@ -2160,14 +2160,7 @@ public:
#endif
/* Member functions to implement superclass interface */
virtual int get_data_size()
{
DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
return 6 + 1 + no_bytes_in_map(&m_cols) +
(m_rows_cur - m_rows_buf););
return ROWS_HEADER_LEN + 1 + no_bytes_in_map(&m_cols) +
(m_rows_cur - m_rows_buf);
}
virtual int get_data_size();
MY_BITMAP const *get_cols() const { return &m_cols; }
my_size_t get_width() const { return m_width; }
......@@ -2178,9 +2171,14 @@ public:
virtual bool write_data_body(IO_CACHE *file);
virtual const char *get_db() { return m_table->s->db.str; }
#endif
/*
Check that malloc() succeeded in allocating memory for the rows
buffer and the COLS vector. Checking that an Update_rows_log_event
is valid is done in the Update_rows_log_event::is_valid()
function.
*/
virtual bool is_valid() const
{
/* that's how we check malloc() succeeded */
return m_rows_buf && m_cols.bitmap;
}
......@@ -2213,10 +2211,20 @@ protected:
ulong m_table_id; /* Table ID */
MY_BITMAP m_cols; /* Bitmap denoting columns available */
ulong m_width; /* The width of the columns bitmap */
/*
Bitmap for columns available in the after image, if present. These
fields are only available for Update_rows events. Observe that the
width of both the before image COLS vector and the after image
COLS vector is the same: the number of columns of the table on the
master.
*/
MY_BITMAP m_cols_ai;
ulong m_master_reclength; /* Length of record on master side */
/* Bit buffer in the same memory as the class */
/* Bit buffers in the same memory as the class */
uint32 m_bitbuf[128/(sizeof(uint32)*8)];
uint32 m_bitbuf_ai[128/(sizeof(uint32)*8)];
byte *m_rows_buf; /* The rows in packed format */
byte *m_rows_cur; /* One-after the end of the data */
......@@ -2376,10 +2384,20 @@ public:
};
#ifndef MYSQL_CLIENT
Update_rows_log_event(THD*, TABLE*, ulong table_id,
MY_BITMAP const *cols, bool is_transactional);
Update_rows_log_event(THD*, TABLE*, ulong table_id,
MY_BITMAP const *cols_bi,
MY_BITMAP const *cols_ai,
bool is_transactional);
Update_rows_log_event(THD*, TABLE*, ulong table_id,
MY_BITMAP const *cols,
bool is_transactional);
void init(MY_BITMAP const *cols);
#endif
virtual ~Update_rows_log_event();
#ifdef HAVE_REPLICATION
Update_rows_log_event(const char *buf, uint event_len,
const Format_description_log_event *description_event);
......@@ -2398,6 +2416,11 @@ public:
}
#endif
virtual bool is_valid() const
{
return Rows_log_event::is_valid() && m_cols_ai.bitmap;
}
private:
virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment