Commit 32b9ab07 authored by unknown's avatar unknown

BUG#27441 (There is no COLS bitmap for the after image of an update

rows event):

Adding a after image COLS bitmap to Update_rows_log_event (for telling
what columns that are present in the after image of each row update).

Also fixing case where Rows_log_event length was not correctly computed
(happened when the number of columns in a table was more than 251). 


mysql-test/r/rpl_row_inexist_tbl.result:
  Result change.
sql/log_event.cc:
  Extending Rows_log_event with two new fields: m_bitbuf_ai and m_cols_ai.  These fields are only used for the Update_rows_log_event.
  Adding implementation of Update_rows_log_event destructor.
  Using new after image fields inside the Update_rows_log_event.
  Factoring out common constructor bodies into Update_rows_log_event::init()
  function.
  
  Fixing case where length of Rows_log_event was not correctly computed (for
  tables with more than 251 columns).
sql/log_event.h:
  Moving implementation of Rows_log_event::get_data_size() into .cc file.
  Adding Update_rows_log_event constructor accepting both before image
  and after image COLS vector.
  Adding Update_rows_log_vector destructor.
  Adding fields m_bitbuf_ai and m_cols_ai to Rows_log_event.
  Fixing is_valid() to look at m_cols_ai as well.
parent e1cf0cb8
...@@ -39,7 +39,7 @@ Replicate_Wild_Ignore_Table ...@@ -39,7 +39,7 @@ Replicate_Wild_Ignore_Table
Last_Errno 1146 Last_Errno 1146
Last_Error Error 'Table 'test.t1' doesn't exist' on opening table `test`.`t1` Last_Error Error 'Table 'test.t1' doesn't exist' on opening table `test`.`t1`
Skip_Counter 0 Skip_Counter 0
Exec_Master_Log_Pos 522 Exec_Master_Log_Pos 523
Relay_Log_Space # Relay_Log_Space #
Until_Condition None Until_Condition None
Until_Log_File Until_Log_File
......
...@@ -5604,7 +5604,10 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, ...@@ -5604,7 +5604,10 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols)); memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
} }
else else
m_cols.bitmap= 0; // to not free it {
// Needed because bitmap_init() does not set it to null on failure
m_cols.bitmap= 0;
}
} }
#endif #endif
...@@ -5641,14 +5644,57 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, ...@@ -5641,14 +5644,57 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
m_flags= uint2korr(post_start); m_flags= uint2korr(post_start);
byte const *const var_start= (const byte *)buf + common_header_len + byte const *const var_start=
post_header_len; (const byte *)buf + common_header_len + post_header_len;
byte const *const ptr_width= var_start; byte const *const ptr_width= var_start;
uchar *ptr_after_width= (uchar*) ptr_width; uchar *ptr_after_width= (uchar*) ptr_width;
DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
m_width = net_field_length(&ptr_after_width); m_width = net_field_length(&ptr_after_width);
DBUG_PRINT("debug", ("m_width=%lu", m_width));
/* if bitmap_init fails, catched in is_valid() */
if (likely(!bitmap_init(&m_cols,
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
(m_width + 7) & ~7UL,
false)))
{
DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
ptr_after_width+= (m_width + 7) / 8;
DBUG_DUMP("m_cols", (char*) m_cols.bitmap, no_bytes_in_map(&m_cols));
}
else
{
// Needed because bitmap_init() does not set it to null on failure
m_cols.bitmap= NULL;
DBUG_VOID_RETURN;
}
const uint byte_count= (m_width + 7) / 8; m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */
const byte* const ptr_rows_data= var_start + byte_count + 1;
if (event_type == UPDATE_ROWS_EVENT)
{
DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
/* if bitmap_init fails, catched in is_valid() */
if (likely(!bitmap_init(&m_cols_ai,
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
(m_width + 7) & ~7UL,
false)))
{
DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
memcpy(m_cols_ai.bitmap, ptr_after_width, (m_width + 7) / 8);
ptr_after_width+= (m_width + 7) / 8;
DBUG_DUMP("m_cols_ai", (char*) m_cols_ai.bitmap, no_bytes_in_map(&m_cols_ai));
}
else
{
// Needed because bitmap_init() does not set it to null on failure
m_cols_ai.bitmap= 0;
DBUG_VOID_RETURN;
}
}
const byte* const ptr_rows_data= (const byte*) ptr_after_width;
my_size_t const data_size= event_len - (ptr_rows_data - (const byte *) buf); my_size_t const data_size= event_len - (ptr_rows_data - (const byte *) buf);
DBUG_PRINT("info",("m_table_id: %lu m_flags: %d m_width: %lu data_size: %lu", DBUG_PRINT("info",("m_table_id: %lu m_flags: %d m_width: %lu data_size: %lu",
...@@ -5657,12 +5703,6 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, ...@@ -5657,12 +5703,6 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
m_rows_buf= (byte*)my_malloc(data_size, MYF(MY_WME)); m_rows_buf= (byte*)my_malloc(data_size, MYF(MY_WME));
if (likely((bool)m_rows_buf)) if (likely((bool)m_rows_buf))
{ {
/* if bitmap_init fails, catched in is_valid() */
if (likely(!bitmap_init(&m_cols,
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
(m_width + 7) & ~7UL,
false)))
memcpy(m_cols.bitmap, ptr_after_width, byte_count);
m_rows_end= m_rows_buf + data_size; m_rows_end= m_rows_buf + data_size;
m_rows_cur= m_rows_end; m_rows_cur= m_rows_end;
memcpy(m_rows_buf, ptr_rows_data, data_size); memcpy(m_rows_buf, ptr_rows_data, data_size);
...@@ -5681,6 +5721,29 @@ Rows_log_event::~Rows_log_event() ...@@ -5681,6 +5721,29 @@ Rows_log_event::~Rows_log_event()
my_free((gptr)m_rows_buf, MYF(MY_ALLOW_ZERO_PTR)); my_free((gptr)m_rows_buf, MYF(MY_ALLOW_ZERO_PTR));
} }
int Rows_log_event::get_data_size()
{
int const type_code= get_type_code();
char buf[sizeof(m_width)+1];
char *end= net_store_length(buf, (m_width + 7) / 8);
DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
return 6 + no_bytes_in_map(&m_cols) + (end - buf) +
(type_code == UPDATE_ROWS_EVENT ? no_bytes_in_map(&m_cols_ai) : 0) +
(m_rows_cur - m_rows_buf););
int data_size= ROWS_HEADER_LEN;
data_size+= no_bytes_in_map(&m_cols);
data_size+= end - buf;
if (type_code == UPDATE_ROWS_EVENT)
data_size+= no_bytes_in_map(&m_cols_ai);
data_size+= (m_rows_cur - m_rows_buf);
return data_size;
}
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
int Rows_log_event::do_add_row_data(byte *const row_data, int Rows_log_event::do_add_row_data(byte *const row_data,
my_size_t const length) my_size_t const length)
...@@ -5911,7 +5974,7 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli) ...@@ -5911,7 +5974,7 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
If m_table_id == ~0UL, then we have a dummy event that does not If m_table_id == ~0UL, then we have a dummy event that does not
contain any data. In that case, we just remove all tables in the contain any data. In that case, we just remove all tables in the
tables_to_lock list, close the thread tables, and return with tables_to_lock list, close the thread tables, and return with
success. The relay log position will be stepped in success.
*/ */
if (m_table_id == ~0UL) if (m_table_id == ~0UL)
{ {
...@@ -6291,15 +6354,35 @@ bool Rows_log_event::write_data_body(IO_CACHE*file) ...@@ -6291,15 +6354,35 @@ bool Rows_log_event::write_data_body(IO_CACHE*file)
*/ */
char sbuf[sizeof(m_width)]; char sbuf[sizeof(m_width)];
my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf; my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
bool res= false;
char *const sbuf_end= net_store_length((char*) sbuf, (uint) m_width); char *const sbuf_end= net_store_length((char*) sbuf, (uint) m_width);
DBUG_ASSERT(static_cast<my_size_t>(sbuf_end - sbuf) <= sizeof(sbuf)); DBUG_ASSERT(static_cast<my_size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
return (my_b_safe_write(file, reinterpret_cast<byte*>(sbuf), DBUG_DUMP("m_width", sbuf, sbuf_end - sbuf);
sbuf_end - sbuf) || res= res || my_b_safe_write(file,
my_b_safe_write(file, reinterpret_cast<byte*>(m_cols.bitmap), reinterpret_cast<byte*>(sbuf),
no_bytes_in_map(&m_cols)) || sbuf_end - sbuf);
my_b_safe_write(file, m_rows_buf, (uint) data_size));
DBUG_DUMP("m_cols", (char*) m_cols.bitmap, no_bytes_in_map(&m_cols));
res= res || my_b_safe_write(file,
reinterpret_cast<byte*>(m_cols.bitmap),
no_bytes_in_map(&m_cols));
/*
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
*/
if (get_type_code() == UPDATE_ROWS_EVENT)
{
DBUG_DUMP("m_cols_ai", (char*) m_cols_ai.bitmap, no_bytes_in_map(&m_cols_ai));
res= res || my_b_safe_write(file,
reinterpret_cast<byte*>(m_cols_ai.bitmap),
no_bytes_in_map(&m_cols_ai));
}
DBUG_DUMP("rows", m_rows_buf, data_size);
res= res || my_b_safe_write(file, m_rows_buf, (uint) data_size);
return res;
} }
#endif #endif
...@@ -7584,16 +7667,55 @@ void Delete_rows_log_event::print(FILE *file, ...@@ -7584,16 +7667,55 @@ void Delete_rows_log_event::print(FILE *file,
*/ */
#if !defined(MYSQL_CLIENT) #if !defined(MYSQL_CLIENT)
Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg, Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
ulong tid, MY_BITMAP const *cols, ulong tid,
MY_BITMAP const *cols_bi,
MY_BITMAP const *cols_ai,
bool is_transactional)
: Rows_log_event(thd_arg, tbl_arg, tid, cols_bi, is_transactional)
#ifdef HAVE_REPLICATION
, m_memory(NULL), m_key(NULL)
#endif
{
init(cols_ai);
}
Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
ulong tid,
MY_BITMAP const *cols,
bool is_transactional) bool is_transactional)
: Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional) : Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional)
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
, m_memory(NULL), m_key(NULL) , m_memory(NULL), m_key(NULL)
#endif #endif
{ {
init(cols);
}
void Update_rows_log_event::init(MY_BITMAP const *cols)
{
/* if bitmap_init fails, catched in is_valid() */
if (likely(!bitmap_init(&m_cols_ai,
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
(m_width + 7) & ~7UL,
false)))
{
/* Cols can be zero if this is a dummy binrows event */
if (likely(cols != NULL))
memcpy(m_cols_ai.bitmap, cols->bitmap, no_bytes_in_map(cols));
}
} }
#endif /* !defined(MYSQL_CLIENT) */ #endif /* !defined(MYSQL_CLIENT) */
Update_rows_log_event::~Update_rows_log_event()
{
if (m_cols_ai.bitmap == m_bitbuf_ai) // no my_malloc happened
m_cols_ai.bitmap= 0; // so no my_free in bitmap_free
bitmap_free(&m_cols_ai); // To pair with bitmap_init().
}
/* /*
Constructor used by slave to read the event from the binary log. Constructor used by slave to read the event from the binary log.
*/ */
...@@ -7678,7 +7800,7 @@ int Update_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO const *rli, ...@@ -7678,7 +7800,7 @@ int Update_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO const *rli,
store_record(table, record[1]); store_record(table, record[1]);
char const *next_start = *row_end; char const *next_start = *row_end;
/* m_after_image is the after image for the update */ /* m_after_image is the after image for the update */
error= unpack_row(rli, table, m_width, next_start, &m_cols, row_end, error= unpack_row(rli, table, m_width, next_start, &m_cols_ai, row_end,
&m_master_reclength, table->write_set, UPDATE_ROWS_EVENT); &m_master_reclength, table->write_set, UPDATE_ROWS_EVENT);
bmove_align(m_after_image, table->record[0], table->s->reclength); bmove_align(m_after_image, table->record[0], table->s->reclength);
restore_record(table, record[1]); restore_record(table, record[1]);
......
...@@ -2160,14 +2160,7 @@ public: ...@@ -2160,14 +2160,7 @@ public:
#endif #endif
/* Member functions to implement superclass interface */ /* Member functions to implement superclass interface */
virtual int get_data_size() virtual int get_data_size();
{
DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
return 6 + 1 + no_bytes_in_map(&m_cols) +
(m_rows_cur - m_rows_buf););
return ROWS_HEADER_LEN + 1 + no_bytes_in_map(&m_cols) +
(m_rows_cur - m_rows_buf);
}
MY_BITMAP const *get_cols() const { return &m_cols; } MY_BITMAP const *get_cols() const { return &m_cols; }
my_size_t get_width() const { return m_width; } my_size_t get_width() const { return m_width; }
...@@ -2178,9 +2171,14 @@ public: ...@@ -2178,9 +2171,14 @@ public:
virtual bool write_data_body(IO_CACHE *file); virtual bool write_data_body(IO_CACHE *file);
virtual const char *get_db() { return m_table->s->db.str; } virtual const char *get_db() { return m_table->s->db.str; }
#endif #endif
/*
Check that malloc() succeeded in allocating memory for the rows
buffer and the COLS vector. Checking that an Update_rows_log_event
is valid is done in the Update_rows_log_event::is_valid()
function.
*/
virtual bool is_valid() const virtual bool is_valid() const
{ {
/* that's how we check malloc() succeeded */
return m_rows_buf && m_cols.bitmap; return m_rows_buf && m_cols.bitmap;
} }
...@@ -2213,10 +2211,20 @@ protected: ...@@ -2213,10 +2211,20 @@ protected:
ulong m_table_id; /* Table ID */ ulong m_table_id; /* Table ID */
MY_BITMAP m_cols; /* Bitmap denoting columns available */ MY_BITMAP m_cols; /* Bitmap denoting columns available */
ulong m_width; /* The width of the columns bitmap */ ulong m_width; /* The width of the columns bitmap */
/*
Bitmap for columns available in the after image, if present. These
fields are only available for Update_rows events. Observe that the
width of both the before image COLS vector and the after image
COLS vector is the same: the number of columns of the table on the
master.
*/
MY_BITMAP m_cols_ai;
ulong m_master_reclength; /* Length of record on master side */ ulong m_master_reclength; /* Length of record on master side */
/* Bit buffer in the same memory as the class */ /* Bit buffers in the same memory as the class */
uint32 m_bitbuf[128/(sizeof(uint32)*8)]; uint32 m_bitbuf[128/(sizeof(uint32)*8)];
uint32 m_bitbuf_ai[128/(sizeof(uint32)*8)];
byte *m_rows_buf; /* The rows in packed format */ byte *m_rows_buf; /* The rows in packed format */
byte *m_rows_cur; /* One-after the end of the data */ byte *m_rows_cur; /* One-after the end of the data */
...@@ -2376,10 +2384,20 @@ public: ...@@ -2376,10 +2384,20 @@ public:
}; };
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Update_rows_log_event(THD*, TABLE*, ulong table_id, Update_rows_log_event(THD*, TABLE*, ulong table_id,
MY_BITMAP const *cols, bool is_transactional); MY_BITMAP const *cols_bi,
MY_BITMAP const *cols_ai,
bool is_transactional);
Update_rows_log_event(THD*, TABLE*, ulong table_id,
MY_BITMAP const *cols,
bool is_transactional);
void init(MY_BITMAP const *cols);
#endif #endif
virtual ~Update_rows_log_event();
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
Update_rows_log_event(const char *buf, uint event_len, Update_rows_log_event(const char *buf, uint event_len,
const Format_description_log_event *description_event); const Format_description_log_event *description_event);
...@@ -2398,6 +2416,11 @@ public: ...@@ -2398,6 +2416,11 @@ public:
} }
#endif #endif
virtual bool is_valid() const
{
return Rows_log_event::is_valid() && m_cols_ai.bitmap;
}
private: private:
virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; } virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment