Commit 64923bb6 authored by unknown's avatar unknown

MDEV-6156: Parallel replication incorrectly caches charset between worker threads

The previous patch for this bug was unfortunately completely wrong.

The purpose of cached_charset is to remember which character set we
have installed currently in the THD, so that in the common case where
charset does not change between queries, we do not need to update it
in the THD. Thus, it is important that the cached_charset field is
tightly coupled to the THD for which it handles caching.

Thus the right place to put cached_charset seems to be in the THD.
This patch introduces a field THD:system_thread_info where such info
in general can be placed without further inflating the THD with unused
data for other threads (THD is already far too big as it is). It then
moves the cached_charset into this slot for the SQL driver thread and
for the parallel replication worker threads.

The THD::rpl_filter field is also moved inside system_thread_info, to
keep the size of THD unchanged. Moving further fields in to reduce the
size of THD is a separate task, filed as MDEV-6164.
parent d3d3a4b8
......@@ -4153,7 +4153,8 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
(sql_mode & ~(ulong) MODE_NO_DIR_IN_CREATE));
if (charset_inited)
{
if (rgi->cached_charset_compare(charset))
rpl_sql_thread_info *sql_info= thd->system_thread_info.rpl_sql_info;
if (sql_info->cached_charset_compare(charset))
{
/* Verify that we support the charsets found in the event. */
if (!(thd->variables.character_set_client=
......
......@@ -216,6 +216,16 @@ public:
bool stop_all_slaves(THD *thd);
};
/*
The class rpl_io_thread_info is the THD::system_thread_info for the IO thread.
*/
class rpl_io_thread_info
{
public:
};
bool check_master_connection_name(LEX_STRING *name);
void create_logfile_name_with_suffix(char *res_file_name, size_t length,
const char *info_file,
......
......@@ -33,7 +33,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
THD *thd= rgi->thd;
thd->rgi_slave= rgi;
thd->rpl_filter = rli->mi->rpl_filter;
thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
/* ToDo: Access to thd, and what about rli, split out a parallel part? */
mysql_mutex_lock(&rli->data_lock);
......@@ -212,6 +212,7 @@ handle_rpl_parallel_thread(void *arg)
rpl_parallel_thread::queued_event *qevs_to_free;
rpl_group_info *rgis_to_free;
group_commit_orderer *gcos_to_free;
rpl_sql_thread_info sql_info(NULL);
size_t total_event_size;
int err;
......@@ -242,6 +243,7 @@ handle_rpl_parallel_thread(void *arg)
thd_proc_info(thd, "Waiting for work from main SQL threads");
thd->set_time();
thd->variables.lock_wait_timeout= LONG_TIMEOUT;
thd->system_thread_info.rpl_sql_info= &sql_info;
/*
For now, we need to run the replication parallel worker threads in
READ COMMITTED. This is needed because gap locks are not symmetric.
......
......@@ -1479,7 +1479,6 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli)
deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
{
reinit(rli);
cached_charset_invalidate();
bzero(&current_gtid, sizeof(current_gtid));
mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
MY_MUTEX_INIT_FAST);
......@@ -1562,29 +1561,6 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi,
}
void rpl_group_info::cached_charset_invalidate()
{
DBUG_ENTER("rpl_group_info::cached_charset_invalidate");
/* Full of zeroes means uninitialized. */
bzero(cached_charset, sizeof(cached_charset));
DBUG_VOID_RETURN;
}
bool rpl_group_info::cached_charset_compare(char *charset) const
{
DBUG_ENTER("rpl_group_info::cached_charset_compare");
if (memcmp(cached_charset, charset, sizeof(cached_charset)))
{
memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
DBUG_RETURN(1);
}
DBUG_RETURN(0);
}
void rpl_group_info::cleanup_context(THD *thd, bool error)
{
DBUG_ENTER("Relay_log_info::cleanup_context");
......@@ -1769,4 +1745,33 @@ rpl_group_info::mark_start_commit()
}
rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
: rpl_filter(filter)
{
cached_charset_invalidate();
}
void rpl_sql_thread_info::cached_charset_invalidate()
{
DBUG_ENTER("rpl_group_info::cached_charset_invalidate");
/* Full of zeroes means uninitialized. */
bzero(cached_charset, sizeof(cached_charset));
DBUG_VOID_RETURN;
}
bool rpl_sql_thread_info::cached_charset_compare(char *charset) const
{
DBUG_ENTER("rpl_group_info::cached_charset_compare");
if (memcmp(cached_charset, charset, sizeof(cached_charset)))
{
memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
DBUG_RETURN(1);
}
DBUG_RETURN(0);
}
#endif
......@@ -26,6 +26,7 @@
struct RPL_TABLE_LIST;
class Master_info;
class Rpl_filter;
/****************************************************************************
......@@ -536,8 +537,6 @@ struct rpl_group_info
mysql_mutex_t sleep_lock;
mysql_cond_t sleep_cond;
char cached_charset[6];
/*
trans_retries varies between 0 to slave_transaction_retries and counts how
many times the slave has retried the present transaction; gets reset to 0
......@@ -671,15 +670,6 @@ struct rpl_group_info
return false;
}
/*
Last charset (6 bytes) seen by slave SQL thread is cached here; it helps
the thread save 3 get_charset() per Query_log_event if the charset is not
changing from event to event (common situation).
When the 6 bytes are equal to 0 is used to mean "cache is invalidated".
*/
void cached_charset_invalidate();
bool cached_charset_compare(char *charset) const;
void clear_tables_to_lock();
void cleanup_context(THD *, bool);
void slave_close_thread_tables(THD *);
......@@ -727,6 +717,30 @@ struct rpl_group_info
};
/*
The class rpl_sql_thread_info is the THD::system_thread_info for an SQL
thread; this is either the driver SQL thread or a worker thread for parallel
replication.
*/
class rpl_sql_thread_info
{
public:
char cached_charset[6];
Rpl_filter* rpl_filter;
rpl_sql_thread_info(Rpl_filter *filter);
/*
Last charset (6 bytes) seen by slave SQL thread is cached here; it helps
the thread save 3 get_charset() per Query_log_event if the charset is not
changing from event to event (common situation).
When the 6 bytes are equal to 0 is used to mean "cache is invalidated".
*/
void cached_charset_invalidate();
bool cached_charset_compare(char *charset) const;
};
// Defined in rpl_rli.cc
int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
......
......@@ -2891,7 +2891,7 @@ void set_slave_thread_default_charset(THD* thd, rpl_group_info *rgi)
global_system_variables.collation_server;
thd->update_charset();
rgi->cached_charset_invalidate();
thd->system_thread_info.rpl_sql_info->cached_charset_invalidate();
DBUG_VOID_RETURN;
}
......@@ -3768,6 +3768,7 @@ pthread_handler_t handle_slave_io(void *arg)
uint retry_count;
bool suppress_warnings;
int ret;
rpl_io_thread_info io_info;
#ifndef DBUG_OFF
uint retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
#endif
......@@ -3801,6 +3802,7 @@ pthread_handler_t handle_slave_io(void *arg)
sql_print_error("Failed during slave I/O thread initialization");
goto err_during_init;
}
thd->system_thread_info.rpl_io_info= &io_info;
mysql_mutex_lock(&LOCK_thread_count);
threads.append(thd);
mysql_mutex_unlock(&LOCK_thread_count);
......@@ -4367,6 +4369,7 @@ pthread_handler_t handle_slave_sql(void *arg)
Relay_log_info* rli = &mi->rli;
const char *errmsg;
rpl_group_info *serial_rgi;
rpl_sql_thread_info sql_info(mi->rpl_filter);
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
......@@ -4378,7 +4381,7 @@ pthread_handler_t handle_slave_sql(void *arg)
serial_rgi= new rpl_group_info(rli);
thd = new THD; // note that contructor of THD uses DBUG_ !
thd->thread_stack = (char*)&thd; // remember where our stack is
thd->rpl_filter = mi->rpl_filter;
thd->system_thread_info.rpl_sql_info= &sql_info;
DBUG_ASSERT(rli->inited);
DBUG_ASSERT(rli->mi == mi);
......@@ -4676,7 +4679,7 @@ err_during_init:
mysql_cond_broadcast(&rli->data_cond);
rli->ignore_log_space_limit= 0; /* don't need any lock */
/* we die so won't remember charset - re-update them on next thread start */
serial_rgi->cached_charset_invalidate();
thd->system_thread_info.rpl_sql_info->cached_charset_invalidate();
/*
TODO: see if we can do this conditionally in next_event() instead
......
......@@ -38,6 +38,7 @@
#include "records.h" // READ_RECORD, read_record_info,
// init_read_record, end_read_record
#include "rpl_filter.h" // rpl_filter
#include "rpl_rli.h"
#include <m_ctype.h>
#include <stdarg.h>
#include "sp_head.h"
......@@ -2558,7 +2559,7 @@ bool change_password(THD *thd, const char *host, const char *user,
{
TABLE_LIST tables;
TABLE *table;
Rpl_filter *rpl_filter= thd->rpl_filter;
Rpl_filter *rpl_filter;
/* Buffer should be extended when password length is extended. */
char buff[512];
ulong query_length;
......@@ -2580,7 +2581,8 @@ bool change_password(THD *thd, const char *host, const char *user,
GRANT and REVOKE are applied the slave in/exclusion rules as they are
some kind of updates to the mysql.% tables.
*/
if (thd->slave_thread && rpl_filter->is_on())
if (thd->slave_thread &&
(rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
{
/*
The tables must be marked "updating" so that tables_ok() takes them into
......@@ -5393,7 +5395,7 @@ int mysql_table_grant(THD *thd, TABLE_LIST *table_list,
TABLE_LIST tables[3];
bool create_new_users=0;
char *db_name, *table_name;
Rpl_filter *rpl_filter= thd->rpl_filter;
Rpl_filter *rpl_filter;
DBUG_ENTER("mysql_table_grant");
if (!initialized)
......@@ -5483,7 +5485,8 @@ int mysql_table_grant(THD *thd, TABLE_LIST *table_list,
GRANT and REVOKE are applied the slave in/exclusion rules as they are
some kind of updates to the mysql.% tables.
*/
if (thd->slave_thread && rpl_filter->is_on())
if (thd->slave_thread &&
(rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
{
/*
The tables must be marked "updating" so that tables_ok() takes them into
......@@ -5670,7 +5673,7 @@ bool mysql_routine_grant(THD *thd, TABLE_LIST *table_list, bool is_proc,
TABLE_LIST tables[2];
bool create_new_users=0, result=0;
char *db_name, *table_name;
Rpl_filter *rpl_filter= thd->rpl_filter;
Rpl_filter *rpl_filter;
DBUG_ENTER("mysql_routine_grant");
if (!initialized)
......@@ -5705,7 +5708,8 @@ bool mysql_routine_grant(THD *thd, TABLE_LIST *table_list, bool is_proc,
GRANT and REVOKE are applied the slave in/exclusion rules as they are
some kind of updates to the mysql.% tables.
*/
if (thd->slave_thread && rpl_filter->is_on())
if (thd->slave_thread &&
(rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
{
/*
The tables must be marked "updating" so that tables_ok() takes them into
......@@ -6141,7 +6145,7 @@ bool mysql_grant(THD *thd, const char *db, List <LEX_USER> &list,
char tmp_db[SAFE_NAME_LEN+1];
bool create_new_users=0;
TABLE_LIST tables[2];
Rpl_filter *rpl_filter= thd->rpl_filter;
Rpl_filter *rpl_filter;
DBUG_ENTER("mysql_grant");
if (!initialized)
......@@ -6190,7 +6194,8 @@ bool mysql_grant(THD *thd, const char *db, List <LEX_USER> &list,
GRANT and REVOKE are applied the slave in/exclusion rules as they are
some kind of updates to the mysql.% tables.
*/
if (thd->slave_thread && rpl_filter->is_on())
if (thd->slave_thread &&
(rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
{
/*
The tables must be marked "updating" so that tables_ok() takes them into
......@@ -8223,7 +8228,7 @@ void get_mqh(const char *user, const char *host, USER_CONN *uc)
#define GRANT_TABLES 7
static int open_grant_tables(THD *thd, TABLE_LIST *tables)
{
Rpl_filter *rpl_filter= thd->rpl_filter;
Rpl_filter *rpl_filter;
DBUG_ENTER("open_grant_tables");
if (!initialized)
......@@ -8267,7 +8272,8 @@ static int open_grant_tables(THD *thd, TABLE_LIST *tables)
GRANT and REVOKE are applied the slave in/exclusion rules as they are
some kind of updates to the mysql.% tables.
*/
if (thd->slave_thread && rpl_filter->is_on())
if (thd->slave_thread &&
(rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
{
/*
The tables must be marked "updating" so that tables_ok() takes them into
......
......@@ -72,6 +72,8 @@ class Parser_state;
class Rows_log_event;
class Sroutine_hash_entry;
class user_var_entry;
class rpl_io_thread_info;
class rpl_sql_thread_info;
enum enum_ha_read_modes { RFIRST, RNEXT, RPREV, RLAST, RKEY, RNEXT_SAME };
enum enum_duplicates { DUP_ERROR, DUP_REPLACE, DUP_UPDATE };
......@@ -1810,8 +1812,10 @@ public:
/* Slave applier execution context */
rpl_group_info* rgi_slave;
/* Used to SLAVE SQL thread */
Rpl_filter* rpl_filter;
union {
rpl_io_thread_info *rpl_io_info;
rpl_sql_thread_info *rpl_sql_info;
} system_thread_info;
void reset_for_next_command();
/*
......
......@@ -171,8 +171,9 @@ const char *xa_state_names[]={
*/
inline bool all_tables_not_ok(THD *thd, TABLE_LIST *tables)
{
return thd->rpl_filter->is_on() && tables && !thd->spcont &&
!thd->rpl_filter->tables_ok(thd->db, tables);
Rpl_filter *rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
return rpl_filter->is_on() && tables && !thd->spcont &&
!rpl_filter->tables_ok(thd->db, tables);
}
#endif
......@@ -2233,7 +2234,7 @@ mysql_execute_command(THD *thd)
/* have table map for update for multi-update statement (BUG#37051) */
bool have_table_map_for_update= FALSE;
/* */
Rpl_filter *rpl_filter= thd->rpl_filter;
Rpl_filter *rpl_filter;
#endif
DBUG_ENTER("mysql_execute_command");
......@@ -3885,13 +3886,16 @@ end_with_restore_list:
above was not called. So we have to check rules again here.
*/
#ifdef HAVE_REPLICATION
if (thd->slave_thread &&
(!rpl_filter->db_ok(lex->name.str) ||
!rpl_filter->db_ok_with_wild_table(lex->name.str)))
if (thd->slave_thread)
{
rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
if (!rpl_filter->db_ok(lex->name.str) ||
!rpl_filter->db_ok_with_wild_table(lex->name.str))
{
my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
break;
}
}
#endif
if (check_access(thd, CREATE_ACL, lex->name.str, NULL, NULL, 1, 0))
break;
......@@ -3913,13 +3917,16 @@ end_with_restore_list:
above was not called. So we have to check rules again here.
*/
#ifdef HAVE_REPLICATION
if (thd->slave_thread &&
(!rpl_filter->db_ok(lex->name.str) ||
!rpl_filter->db_ok_with_wild_table(lex->name.str)))
if (thd->slave_thread)
{
rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
if (!rpl_filter->db_ok(lex->name.str) ||
!rpl_filter->db_ok_with_wild_table(lex->name.str))
{
my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
break;
}
}
#endif
if (check_access(thd, DROP_ACL, lex->name.str, NULL, NULL, 1, 0))
break;
......@@ -3930,14 +3937,17 @@ end_with_restore_list:
{
LEX_STRING *db= & lex->name;
#ifdef HAVE_REPLICATION
if (thd->slave_thread &&
(!rpl_filter->db_ok(db->str) ||
!rpl_filter->db_ok_with_wild_table(db->str)))
if (thd->slave_thread)
{
rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
if (!rpl_filter->db_ok(db->str) ||
!rpl_filter->db_ok_with_wild_table(db->str))
{
res= 1;
my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
break;
}
}
#endif
if (check_db_name(db))
{
......@@ -3973,13 +3983,16 @@ end_with_restore_list:
above was not called. So we have to check rules again here.
*/
#ifdef HAVE_REPLICATION
if (thd->slave_thread &&
(!rpl_filter->db_ok(db->str) ||
!rpl_filter->db_ok_with_wild_table(db->str)))
if (thd->slave_thread)
{
rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
if (!rpl_filter->db_ok(db->str) ||
!rpl_filter->db_ok_with_wild_table(db->str))
{
my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
break;
}
}
#endif
if (check_access(thd, ALTER_ACL, db->str, NULL, NULL, 1, 0))
break;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment