Commit 8d9cf89e authored by Mats Kindahl's avatar Mats Kindahl

Bug #38360: BLACKHOLE replication with RBR is broken

The Blackhole engine did not support row-based replication
since the delete_row(), update_row(), and the index and range
searching functions were not implemented.

This patch adds row-based replication support for the
Blackhole engine by implementing the two functions mentioned
above, and making the engine pretend that it has found the
correct row to delete or update when executed from the slave
SQL thread by implementing index and range searching functions.

It is necessary to only pretend this for the SQL thread, since
a SELECT executed on the Blackhole engine will otherwise never
return EOF, causing a livelock.


mysql-test/extra/binlog_tests/blackhole.test:
  Blackhole now handles row-based replication.
mysql-test/extra/rpl_tests/rpl_blackhole.test:
  Test helper file for testing that blackhole actually
  writes something to the binary log on the slave.
mysql-test/suite/binlog/t/binlog_multi_engine.test:
  Replication now handles row-based replcation.
mysql-test/suite/rpl/t/rpl_blackhole.test:
  Test that Blackhole works with primary key, index, or none.
sql/log_event.cc:
  Correcting code to only touch filler bits and leave
  all other bits alone. It is necessary since there is
  no guarantee that the engine will be able to fill in
  the bits correctly (e.g., the blackhole engine).
storage/blackhole/ha_blackhole.cc:
  Adding definitions for update_row() and delete_row() to return OK
  when executed from the slave SQL thread with thd->query == NULL
  (indicating that row-based replication events are being processed).
  
  Changing rnd_next(), index_read(), index_read_idx(), and
  index_read_last() to return OK when executed from the slave SQL
  thread (faking that the row has been found so that processing
  proceeds to update/delete the row).
storage/blackhole/ha_blackhole.h:
  Enabling row capabilities for engine.
  Defining write_row(), update_row(), and delete_row().
  Making write_row() private (as it should be).
parent 70e2f814
...@@ -139,15 +139,6 @@ drop table t1,t2,t3; ...@@ -139,15 +139,6 @@ drop table t1,t2,t3;
# table # table
# #
CREATE TABLE t1(a INT) ENGINE=BLACKHOLE; CREATE TABLE t1(a INT) ENGINE=BLACKHOLE;
# NOTE: After exchanging open_ltable() by open_and_lock_tables() in
# handle_delayed_insert() to fix problems with MERGE tables (Bug#26379),
# problems with INSERT DELAYED and BLACKHOLE popped up. open_ltable()
# does not check if the binlogging capabilities of the statement and the
# table match. So the below used to succeed. But since INSERT DELAYED
# switches to row-based logging in mixed-mode and BLACKHOLE cannot do
# row-based logging, it could not really work. Until this problem is
# correctly fixed, we have that error here.
--error ER_BINLOG_LOGGING_IMPOSSIBLE
INSERT DELAYED INTO t1 VALUES(1); INSERT DELAYED INTO t1 VALUES(1);
DROP TABLE t1; DROP TABLE t1;
......
connection slave;
let $before = query_get_value("SHOW MASTER STATUS", Position, 1);
--echo [on master]
connection master;
eval $statement;
--echo [on slave]
sync_slave_with_master;
--echo # Expect 0
SELECT COUNT(*) FROM t1;
let $after = query_get_value("SHOW MASTER STATUS", Position, 1);
let $something_written = `select $after - $before != 0`;
if ($something_written) {
--echo >>> Something was written to binary log <<<
}
if (!$something_written) {
--echo >>> Nothing was written to binary log <<<
}
...@@ -43,8 +43,6 @@ INSERT INTO t1n VALUES (1,1), (1,2), (2,1), (2,2); ...@@ -43,8 +43,6 @@ INSERT INTO t1n VALUES (1,1), (1,2), (2,1), (2,2);
UPDATE t1m, t1b SET m = 2, b = 3 WHERE n = c; UPDATE t1m, t1b SET m = 2, b = 3 WHERE n = c;
UPDATE t1m, t1n SET m = 2, e = 3 WHERE n = f; UPDATE t1m, t1n SET m = 2, e = 3 WHERE n = f;
ERROR HY000: Binary logging not possible. Message: Statement cannot be written atomically since more than one engine involved and at least one engine is self-logging ERROR HY000: Binary logging not possible. Message: Statement cannot be written atomically since more than one engine involved and at least one engine is self-logging
UPDATE t1n, t1b SET e = 2, b = 3 WHERE f = c;
ERROR HY000: Binary logging not possible. Message: Statement cannot be written atomically since more than one engine involved and at least one engine is self-logging
TRUNCATE t1m; TRUNCATE t1m;
TRUNCATE t1b; TRUNCATE t1b;
TRUNCATE t1n; TRUNCATE t1n;
...@@ -68,20 +66,21 @@ RESET MASTER; ...@@ -68,20 +66,21 @@ RESET MASTER;
SET SESSION BINLOG_FORMAT=ROW; SET SESSION BINLOG_FORMAT=ROW;
INSERT INTO t1m VALUES (1,1), (1,2), (2,1), (2,2); INSERT INTO t1m VALUES (1,1), (1,2), (2,1), (2,2);
INSERT INTO t1b VALUES (1,1), (1,2), (2,1), (2,2); INSERT INTO t1b VALUES (1,1), (1,2), (2,1), (2,2);
ERROR HY000: Binary logging not possible. Message: Row-based format required for this statement, but not allowed by this combination of engines
INSERT INTO t1n VALUES (1,1), (1,2), (2,1), (2,2); INSERT INTO t1n VALUES (1,1), (1,2), (2,1), (2,2);
UPDATE t1m, t1b SET m = 2, b = 3 WHERE n = c;
ERROR HY000: Binary logging not possible. Message: Row-based format required for this statement, but not allowed by this combination of engines
UPDATE t1m, t1n SET m = 2, e = 3 WHERE n = f; UPDATE t1m, t1n SET m = 2, e = 3 WHERE n = f;
ERROR HY000: Binary logging not possible. Message: Statement cannot be written atomically since more than one engine involved and at least one engine is self-logging ERROR HY000: Binary logging not possible. Message: Statement cannot be written atomically since more than one engine involved and at least one engine is self-logging
UPDATE t1n, t1b SET e = 2, b = 3 WHERE f = c; UPDATE t1n, t1b SET e = 2, b = 3 WHERE f = c;
ERROR HY000: Binary logging not possible. Message: Row-based format required for this statement, but not allowed by this combination of engines ERROR HY000: Binary logging not possible. Message: Statement cannot be written atomically since more than one engine involved and at least one engine is self-logging
show binlog events from <binlog_start>; show binlog events from <binlog_start>;
Log_name Pos Event_type Server_id End_log_pos Info Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Query # # use `test`; BEGIN master-bin.000001 # Query # # use `test`; BEGIN
master-bin.000001 # Table_map # # table_id: # (test.t1m) master-bin.000001 # Table_map # # table_id: # (test.t1m)
master-bin.000001 # Write_rows # # table_id: # flags: STMT_END_F master-bin.000001 # Write_rows # # table_id: # flags: STMT_END_F
master-bin.000001 # Query # # use `test`; COMMIT master-bin.000001 # Query # # use `test`; COMMIT
master-bin.000001 # Query # # use `test`; BEGIN
master-bin.000001 # Table_map # # table_id: # (test.t1b)
master-bin.000001 # Write_rows # # table_id: # flags: STMT_END_F
master-bin.000001 # Query # # use `test`; COMMIT
master-bin.000001 # Query # # BEGIN master-bin.000001 # Query # # BEGIN
master-bin.000001 # Table_map # # table_id: # (test.t1n) master-bin.000001 # Table_map # # table_id: # (test.t1n)
master-bin.000001 # Table_map # # table_id: # (mysql.ndb_apply_status) master-bin.000001 # Table_map # # table_id: # (mysql.ndb_apply_status)
......
...@@ -141,7 +141,6 @@ master-bin.000001 # Query # # use `test`; COMMIT ...@@ -141,7 +141,6 @@ master-bin.000001 # Query # # use `test`; COMMIT
drop table t1,t2,t3; drop table t1,t2,t3;
CREATE TABLE t1(a INT) ENGINE=BLACKHOLE; CREATE TABLE t1(a INT) ENGINE=BLACKHOLE;
INSERT DELAYED INTO t1 VALUES(1); INSERT DELAYED INTO t1 VALUES(1);
ERROR HY000: Binary logging not possible. Message: Row-based format required for this statement, but not allowed by this combination of engines
DROP TABLE t1; DROP TABLE t1;
CREATE TABLE t1(a INT, b INT) ENGINE=BLACKHOLE; CREATE TABLE t1(a INT, b INT) ENGINE=BLACKHOLE;
DELETE FROM t1 WHERE a=10; DELETE FROM t1 WHERE a=10;
......
...@@ -69,9 +69,6 @@ UPDATE t1m, t1n SET m = 2, e = 3 WHERE n = f; ...@@ -69,9 +69,6 @@ UPDATE t1m, t1n SET m = 2, e = 3 WHERE n = f;
#UPDATE t1m, t1n SET m = 2, e = 3 WHERE n = f; #UPDATE t1m, t1n SET m = 2, e = 3 WHERE n = f;
error ER_BINLOG_LOGGING_IMPOSSIBLE;
UPDATE t1n, t1b SET e = 2, b = 3 WHERE f = c;
TRUNCATE t1m; TRUNCATE t1m;
TRUNCATE t1b; TRUNCATE t1b;
TRUNCATE t1n; TRUNCATE t1n;
...@@ -83,12 +80,10 @@ RESET MASTER; ...@@ -83,12 +80,10 @@ RESET MASTER;
SET SESSION BINLOG_FORMAT=ROW; SET SESSION BINLOG_FORMAT=ROW;
INSERT INTO t1m VALUES (1,1), (1,2), (2,1), (2,2); INSERT INTO t1m VALUES (1,1), (1,2), (2,1), (2,2);
error ER_BINLOG_LOGGING_IMPOSSIBLE;
INSERT INTO t1b VALUES (1,1), (1,2), (2,1), (2,2); INSERT INTO t1b VALUES (1,1), (1,2), (2,1), (2,2);
INSERT INTO t1n VALUES (1,1), (1,2), (2,1), (2,2); INSERT INTO t1n VALUES (1,1), (1,2), (2,1), (2,2);
error ER_BINLOG_LOGGING_IMPOSSIBLE;
UPDATE t1m, t1b SET m = 2, b = 3 WHERE n = c;
error ER_BINLOG_LOGGING_IMPOSSIBLE; error ER_BINLOG_LOGGING_IMPOSSIBLE;
UPDATE t1m, t1n SET m = 2, e = 3 WHERE n = f; UPDATE t1m, t1n SET m = 2, e = 3 WHERE n = f;
......
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
CREATE TABLE t1 (a INT, b INT, c INT);
CREATE TABLE t2 (a INT, b INT, c INT);
ALTER TABLE t1 ENGINE=BLACKHOLE;
INSERT INTO t2 VALUES (1,9,1), (2,9,2), (3,9,3), (4,9,4);
[on master]
INSERT INTO t1 VALUES (1,1,1),(2,1,2),(3,1,3),(4,1,4);
[on slave]
# Expect 0
SELECT COUNT(*) FROM t1;
COUNT(*)
0
>>> Something was written to binary log <<<
[on master]
UPDATE t1 SET c = 2*c WHERE a % 2 = 0 AND b = 1;
[on slave]
# Expect 0
SELECT COUNT(*) FROM t1;
COUNT(*)
0
>>> Something was written to binary log <<<
[on master]
DELETE FROM t1 WHERE a % 2 = 0 AND b = 1;
[on slave]
# Expect 0
SELECT COUNT(*) FROM t1;
COUNT(*)
0
>>> Something was written to binary log <<<
[on master]
INSERT INTO t1 SELECT * FROM t2;
[on slave]
# Expect 0
SELECT COUNT(*) FROM t1;
COUNT(*)
0
>>> Something was written to binary log <<<
[on master]
INSERT INTO t2 SELECT * FROM t1;
[on slave]
# Expect 0
SELECT COUNT(*) FROM t1;
COUNT(*)
0
>>> Something was written to binary log <<<
ALTER TABLE t1 ADD PRIMARY KEY pk_t1 (a,b);
[on master]
INSERT INTO t1 VALUES (1,2,1),(2,2,2),(3,2,3),(4,2,4);
[on slave]
# Expect 0
SELECT COUNT(*) FROM t1;
COUNT(*)
0
>>> Something was written to binary log <<<
[on master]
UPDATE t1 SET c = 2*c WHERE a % 2 = 0 AND b = 2;
[on slave]
# Expect 0
SELECT COUNT(*) FROM t1;
COUNT(*)
0
>>> Something was written to binary log <<<
[on master]
DELETE FROM t1 WHERE a % 2 = 0 AND b = 2;
[on slave]
# Expect 0
SELECT COUNT(*) FROM t1;
COUNT(*)
0
>>> Something was written to binary log <<<
ALTER TABLE t1 DROP PRIMARY KEY, ADD KEY key_t1 (a);
[on master]
INSERT INTO t1 VALUES (1,3,1),(2,3,2),(3,3,3),(4,3,4);
[on slave]
# Expect 0
SELECT COUNT(*) FROM t1;
COUNT(*)
0
>>> Something was written to binary log <<<
[on master]
UPDATE t1 SET c = 2*c WHERE a % 2 = 0 AND b = 3;
[on slave]
# Expect 0
SELECT COUNT(*) FROM t1;
COUNT(*)
0
>>> Something was written to binary log <<<
[on master]
DELETE FROM t1 WHERE a % 2 = 0 AND b = 3;
[on slave]
# Expect 0
SELECT COUNT(*) FROM t1;
COUNT(*)
0
>>> Something was written to binary log <<<
--source include/master-slave.inc
# Test to test that blackhole works with replication, all three
# modes. We start by creating a table on the master and then change
# the engine to use blackhole on the slave.
# We start with no primary key
CREATE TABLE t1 (a INT, b INT, c INT);
CREATE TABLE t2 (a INT, b INT, c INT);
sync_slave_with_master;
ALTER TABLE t1 ENGINE=BLACKHOLE;
connection master;
INSERT INTO t2 VALUES (1,9,1), (2,9,2), (3,9,3), (4,9,4);
sync_slave_with_master;
# Test insert, no primary key
let $statement = INSERT INTO t1 VALUES (1,1,1),(2,1,2),(3,1,3),(4,1,4);
source extra/rpl_tests/rpl_blackhole.test;
# Test update, no primary key
let $statement = UPDATE t1 SET c = 2*c WHERE a % 2 = 0 AND b = 1;
source extra/rpl_tests/rpl_blackhole.test;
# Test delete, no primary key
let $statement = DELETE FROM t1 WHERE a % 2 = 0 AND b = 1;
source extra/rpl_tests/rpl_blackhole.test;
# Test INSERT-SELECT into Blackhole, no primary key
let $statement = INSERT INTO t1 SELECT * FROM t2;
source extra/rpl_tests/rpl_blackhole.test;
# Test INSERT-SELECT from Blackhole, no primary key
let $statement = INSERT INTO t2 SELECT * FROM t1;
source extra/rpl_tests/rpl_blackhole.test;
connection master;
ALTER TABLE t1 ADD PRIMARY KEY pk_t1 (a,b);
# Test insert, primary key
let $statement = INSERT INTO t1 VALUES (1,2,1),(2,2,2),(3,2,3),(4,2,4);
source extra/rpl_tests/rpl_blackhole.test;
# Test update, primary key
let $statement = UPDATE t1 SET c = 2*c WHERE a % 2 = 0 AND b = 2;
source extra/rpl_tests/rpl_blackhole.test;
# Test delete, primary key
let $statement = DELETE FROM t1 WHERE a % 2 = 0 AND b = 2;
source extra/rpl_tests/rpl_blackhole.test;
connection master;
ALTER TABLE t1 DROP PRIMARY KEY, ADD KEY key_t1 (a);
# Test insert, key
let $statement = INSERT INTO t1 VALUES (1,3,1),(2,3,2),(3,3,3),(4,3,4);
source extra/rpl_tests/rpl_blackhole.test;
# Test update, key
let $statement = UPDATE t1 SET c = 2*c WHERE a % 2 = 0 AND b = 3;
source extra/rpl_tests/rpl_blackhole.test;
# Test delete, key
let $statement = DELETE FROM t1 WHERE a % 2 = 0 AND b = 3;
source extra/rpl_tests/rpl_blackhole.test;
...@@ -8605,10 +8605,10 @@ int Rows_log_event::find_row(const Relay_log_info *rli) ...@@ -8605,10 +8605,10 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
the necessary bits on the bytes and don't set the filler bits the necessary bits on the bytes and don't set the filler bits
correctly. correctly.
*/ */
my_ptrdiff_t const pos= if (table->s->null_bytes > 0)
table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0; table->record[0][table->s->null_bytes - 1]|=
table->record[0][pos]= 0xFF; 256U - (1U << table->s->last_null_bit_pos);
if ((error= table->file->index_read_map(table->record[0], m_key, if ((error= table->file->index_read_map(table->record[0], m_key,
HA_WHOLE_KEY, HA_WHOLE_KEY,
HA_READ_KEY_EXACT))) HA_READ_KEY_EXACT)))
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#pragma implementation // gcc: Class implementation #pragma implementation // gcc: Class implementation
#endif #endif
#define MYSQL_SERVER 1
#include "mysql_priv.h" #include "mysql_priv.h"
#include "ha_blackhole.h" #include "ha_blackhole.h"
...@@ -100,6 +101,24 @@ int ha_blackhole::write_row(uchar * buf) ...@@ -100,6 +101,24 @@ int ha_blackhole::write_row(uchar * buf)
DBUG_RETURN(table->next_number_field ? update_auto_increment() : 0); DBUG_RETURN(table->next_number_field ? update_auto_increment() : 0);
} }
int ha_blackhole::update_row(const uchar *old_data, uchar *new_data)
{
DBUG_ENTER("ha_blackhole::update_row");
THD *thd= ha_thd();
if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query == NULL)
DBUG_RETURN(0);
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
}
int ha_blackhole::delete_row(const uchar *buf)
{
DBUG_ENTER("ha_blackhole::delete_row");
THD *thd= ha_thd();
if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query == NULL)
DBUG_RETURN(0);
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
}
int ha_blackhole::rnd_init(bool scan) int ha_blackhole::rnd_init(bool scan)
{ {
DBUG_ENTER("ha_blackhole::rnd_init"); DBUG_ENTER("ha_blackhole::rnd_init");
...@@ -110,6 +129,9 @@ int ha_blackhole::rnd_init(bool scan) ...@@ -110,6 +129,9 @@ int ha_blackhole::rnd_init(bool scan)
int ha_blackhole::rnd_next(uchar *buf) int ha_blackhole::rnd_next(uchar *buf)
{ {
DBUG_ENTER("ha_blackhole::rnd_next"); DBUG_ENTER("ha_blackhole::rnd_next");
THD *thd= ha_thd();
if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query == NULL)
DBUG_RETURN(0);
DBUG_RETURN(HA_ERR_END_OF_FILE); DBUG_RETURN(HA_ERR_END_OF_FILE);
} }
...@@ -189,6 +211,9 @@ int ha_blackhole::index_read_map(uchar * buf, const uchar * key, ...@@ -189,6 +211,9 @@ int ha_blackhole::index_read_map(uchar * buf, const uchar * key,
enum ha_rkey_function find_flag) enum ha_rkey_function find_flag)
{ {
DBUG_ENTER("ha_blackhole::index_read"); DBUG_ENTER("ha_blackhole::index_read");
THD *thd= ha_thd();
if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query == NULL)
DBUG_RETURN(0);
DBUG_RETURN(HA_ERR_END_OF_FILE); DBUG_RETURN(HA_ERR_END_OF_FILE);
} }
...@@ -198,6 +223,9 @@ int ha_blackhole::index_read_idx_map(uchar * buf, uint idx, const uchar * key, ...@@ -198,6 +223,9 @@ int ha_blackhole::index_read_idx_map(uchar * buf, uint idx, const uchar * key,
enum ha_rkey_function find_flag) enum ha_rkey_function find_flag)
{ {
DBUG_ENTER("ha_blackhole::index_read_idx"); DBUG_ENTER("ha_blackhole::index_read_idx");
THD *thd= ha_thd();
if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query == NULL)
DBUG_RETURN(0);
DBUG_RETURN(HA_ERR_END_OF_FILE); DBUG_RETURN(HA_ERR_END_OF_FILE);
} }
...@@ -206,6 +234,9 @@ int ha_blackhole::index_read_last_map(uchar * buf, const uchar * key, ...@@ -206,6 +234,9 @@ int ha_blackhole::index_read_last_map(uchar * buf, const uchar * key,
key_part_map keypart_map) key_part_map keypart_map)
{ {
DBUG_ENTER("ha_blackhole::index_read_last"); DBUG_ENTER("ha_blackhole::index_read_last");
THD *thd= ha_thd();
if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query == NULL)
DBUG_RETURN(0);
DBUG_RETURN(HA_ERR_END_OF_FILE); DBUG_RETURN(HA_ERR_END_OF_FILE);
} }
......
...@@ -53,7 +53,7 @@ public: ...@@ -53,7 +53,7 @@ public:
ulonglong table_flags() const ulonglong table_flags() const
{ {
return(HA_NULL_IN_KEY | HA_CAN_FULLTEXT | HA_CAN_SQL_HANDLER | return(HA_NULL_IN_KEY | HA_CAN_FULLTEXT | HA_CAN_SQL_HANDLER |
HA_BINLOG_STMT_CAPABLE | HA_BINLOG_STMT_CAPABLE | HA_BINLOG_ROW_CAPABLE |
HA_CAN_INDEX_BLOBS | HA_AUTO_PART_KEY | HA_CAN_INDEX_BLOBS | HA_AUTO_PART_KEY |
HA_FILE_BASED | HA_CAN_GEOMETRY | HA_CAN_INSERT_DELAYED); HA_FILE_BASED | HA_CAN_GEOMETRY | HA_CAN_INSERT_DELAYED);
} }
...@@ -72,7 +72,6 @@ public: ...@@ -72,7 +72,6 @@ public:
uint max_supported_key_part_length() const { return BLACKHOLE_MAX_KEY_LENGTH; } uint max_supported_key_part_length() const { return BLACKHOLE_MAX_KEY_LENGTH; }
int open(const char *name, int mode, uint test_if_locked); int open(const char *name, int mode, uint test_if_locked);
int close(void); int close(void);
int write_row(uchar * buf);
int rnd_init(bool scan); int rnd_init(bool scan);
int rnd_next(uchar *buf); int rnd_next(uchar *buf);
int rnd_pos(uchar * buf, uchar *pos); int rnd_pos(uchar * buf, uchar *pos);
...@@ -94,4 +93,8 @@ public: ...@@ -94,4 +93,8 @@ public:
THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **store_lock(THD *thd,
THR_LOCK_DATA **to, THR_LOCK_DATA **to,
enum thr_lock_type lock_type); enum thr_lock_type lock_type);
private:
virtual int write_row(uchar *buf);
virtual int update_row(const uchar *old_data, uchar *new_data);
virtual int delete_row(const uchar *buf);
}; };
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment