Commit 070c147b authored by mats@romeo.(none)'s avatar mats@romeo.(none)

Merge romeo.(none):/home/bkroot/mysql-5.1-new-rpl

into  romeo.(none):/home/bk/b23171-mysql-5.1-new-rpl
parents f0936e94 82f3f318
...@@ -97,8 +97,8 @@ Replicate_Do_Table ...@@ -97,8 +97,8 @@ Replicate_Do_Table
Replicate_Ignore_Table <Replicate_Ignore_Table> Replicate_Ignore_Table <Replicate_Ignore_Table>
Replicate_Wild_Do_Table Replicate_Wild_Do_Table
Replicate_Wild_Ignore_Table Replicate_Wild_Ignore_Table
Last_Errno 146 Last_Errno 1105
Last_Error Error in Write_rows event: error during transaction execution on table test.t1 Last_Error Unknown error
Skip_Counter 0 Skip_Counter 0
Exec_Master_Log_Pos <Exec_Master_Log_Pos> Exec_Master_Log_Pos <Exec_Master_Log_Pos>
Relay_Log_Space <Relay_Log_Space> Relay_Log_Space <Relay_Log_Space>
......
...@@ -122,7 +122,7 @@ Replicate_Do_Table ...@@ -122,7 +122,7 @@ Replicate_Do_Table
Replicate_Ignore_Table Replicate_Ignore_Table
Replicate_Wild_Do_Table Replicate_Wild_Do_Table
Replicate_Wild_Ignore_Table Replicate_Wild_Ignore_Table
Last_Errno 1364 Last_Errno 1105
Last_Error Error in Write_rows event: error during transaction execution on table test.t1_nodef Last_Error Error in Write_rows event: error during transaction execution on table test.t1_nodef
Skip_Counter 0 Skip_Counter 0
Exec_Master_Log_Pos # Exec_Master_Log_Pos #
......
...@@ -122,7 +122,7 @@ Replicate_Do_Table ...@@ -122,7 +122,7 @@ Replicate_Do_Table
Replicate_Ignore_Table Replicate_Ignore_Table
Replicate_Wild_Do_Table Replicate_Wild_Do_Table
Replicate_Wild_Ignore_Table Replicate_Wild_Ignore_Table
Last_Errno 1364 Last_Errno 1105
Last_Error Error in Write_rows event: error during transaction execution on table test.t1_nodef Last_Error Error in Write_rows event: error during transaction execution on table test.t1_nodef
Skip_Counter 0 Skip_Counter 0
Exec_Master_Log_Pos # Exec_Master_Log_Pos #
......
...@@ -1548,7 +1548,13 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) ...@@ -1548,7 +1548,13 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
(binlog_trx_data*) thd->ha_data[binlog_hton->slot]; (binlog_trx_data*) thd->ha_data[binlog_hton->slot];
DBUG_ASSERT(mysql_bin_log.is_open()); DBUG_ASSERT(mysql_bin_log.is_open());
if (all && trx_data->empty()) /*
The condition here has to be identical to the one inside
binlog_end_trans(), guarding the write of the transaction cache to
the binary log.
*/
if ((all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) &&
trx_data->empty())
{ {
// we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid() // we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid()
trx_data->reset(); trx_data->reset();
...@@ -2499,7 +2505,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, ...@@ -2499,7 +2505,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
/* /*
Set 'created' to 0, so that in next relay logs this event does not Set 'created' to 0, so that in next relay logs this event does not
trigger cleaning actions on the slave in trigger cleaning actions on the slave in
Format_description_log_event::exec_event(). Format_description_log_event::apply_event_impl().
*/ */
description_event_for_queue->created= 0; description_event_for_queue->created= 0;
/* Don't set log_pos in event header */ /* Don't set log_pos in event header */
...@@ -3206,8 +3212,10 @@ void MYSQL_BIN_LOG::new_file_impl(bool need_lock) ...@@ -3206,8 +3212,10 @@ void MYSQL_BIN_LOG::new_file_impl(bool need_lock)
{ {
tc_log_page_waits++; tc_log_page_waits++;
pthread_mutex_lock(&LOCK_prep_xids); pthread_mutex_lock(&LOCK_prep_xids);
while (prepared_xids) while (prepared_xids) {
DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
pthread_cond_wait(&COND_prep_xids, &LOCK_prep_xids); pthread_cond_wait(&COND_prep_xids, &LOCK_prep_xids);
}
pthread_mutex_unlock(&LOCK_prep_xids); pthread_mutex_unlock(&LOCK_prep_xids);
} }
...@@ -5061,8 +5069,10 @@ void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) ...@@ -5061,8 +5069,10 @@ void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
{ {
pthread_mutex_lock(&LOCK_prep_xids); pthread_mutex_lock(&LOCK_prep_xids);
DBUG_ASSERT(prepared_xids > 0); DBUG_ASSERT(prepared_xids > 0);
if (--prepared_xids == 0) if (--prepared_xids == 0) {
DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
pthread_cond_signal(&COND_prep_xids); pthread_cond_signal(&COND_prep_xids);
}
pthread_mutex_unlock(&LOCK_prep_xids); pthread_mutex_unlock(&LOCK_prep_xids);
rotate_and_purge(0); // as ::write() did not rotate rotate_and_purge(0); // as ::write() did not rotate
} }
......
This diff is collapsed.
This diff is collapsed.
...@@ -29,14 +29,15 @@ int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, ...@@ -29,14 +29,15 @@ int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
st_relay_log_info::st_relay_log_info() st_relay_log_info::st_relay_log_info()
:no_storage(FALSE), info_fd(-1), cur_log_fd(-1), save_temporary_tables(0), :no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id),
info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0), cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0),
ignore_log_space_limit(0), last_master_timestamp(0), slave_skip_counter(0), ignore_log_space_limit(0), last_master_timestamp(0), slave_skip_counter(0),
abort_pos_wait(0), slave_run_id(0), sql_thd(0), last_slave_errno(0), abort_pos_wait(0), slave_run_id(0), sql_thd(0), last_slave_errno(0),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), until_log_pos(0), retried_trans(0),
tables_to_lock(0), tables_to_lock_count(0), tables_to_lock(0), tables_to_lock_count(0),
unsafe_to_stop_at(0) last_event_start_time(0)
{ {
DBUG_ENTER("st_relay_log_info::st_relay_log_info"); DBUG_ENTER("st_relay_log_info::st_relay_log_info");
...@@ -1001,6 +1002,22 @@ bool st_relay_log_info::is_until_satisfied() ...@@ -1001,6 +1002,22 @@ bool st_relay_log_info::is_until_satisfied()
log_pos= group_relay_log_pos; log_pos= group_relay_log_pos;
} }
#ifndef DBUG_OFF
{
char buf[32];
DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
group_master_log_name, llstr(group_master_log_pos, buf)));
DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
group_relay_log_name, llstr(group_relay_log_pos, buf)));
DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
until_condition == UNTIL_MASTER_POS ? "master" : "relay",
log_name, llstr(log_pos, buf)));
DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
until_condition == UNTIL_MASTER_POS ? "master" : "relay",
until_log_name, llstr(until_log_pos, buf)));
}
#endif
if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN) if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
{ {
/* /*
...@@ -1056,30 +1073,19 @@ void st_relay_log_info::cached_charset_invalidate() ...@@ -1056,30 +1073,19 @@ void st_relay_log_info::cached_charset_invalidate()
} }
bool st_relay_log_info::cached_charset_compare(char *charset) bool st_relay_log_info::cached_charset_compare(char *charset) const
{ {
DBUG_ENTER("st_relay_log_info::cached_charset_compare"); DBUG_ENTER("st_relay_log_info::cached_charset_compare");
if (bcmp(cached_charset, charset, sizeof(cached_charset))) if (bcmp(cached_charset, charset, sizeof(cached_charset)))
{ {
memcpy(cached_charset, charset, sizeof(cached_charset)); memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
DBUG_RETURN(1); DBUG_RETURN(1);
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
void st_relay_log_info::transaction_end(THD* thd)
{
DBUG_ENTER("st_relay_log_info::transaction_end");
/*
Nothing to do here right now.
*/
DBUG_VOID_RETURN;
}
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
void st_relay_log_info::cleanup_context(THD *thd, bool error) void st_relay_log_info::cleanup_context(THD *thd, bool error)
{ {
...@@ -1106,7 +1112,7 @@ void st_relay_log_info::cleanup_context(THD *thd, bool error) ...@@ -1106,7 +1112,7 @@ void st_relay_log_info::cleanup_context(THD *thd, bool error)
m_table_map.clear_tables(); m_table_map.clear_tables();
close_thread_tables(thd); close_thread_tables(thd);
clear_tables_to_lock(); clear_tables_to_lock();
unsafe_to_stop_at= 0; last_event_start_time= 0;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
......
...@@ -58,6 +58,15 @@ typedef struct st_relay_log_info ...@@ -58,6 +58,15 @@ typedef struct st_relay_log_info
*/ */
bool no_storage; bool no_storage;
/*
If true, events with the same server id should be replicated. This
field is set on creation of a relay log info structure by copying
the value of ::replicate_same_server_id and can be overridden if
necessary. For example of when this is done, check sql_binlog.cc,
where the BINLOG statement can be used to execute "raw" events.
*/
bool replicate_same_server_id;
/*** The following variables can only be read when protect by data lock ****/ /*** The following variables can only be read when protect by data lock ****/
/* /*
...@@ -292,14 +301,19 @@ typedef struct st_relay_log_info ...@@ -292,14 +301,19 @@ typedef struct st_relay_log_info
When the 6 bytes are equal to 0 is used to mean "cache is invalidated". When the 6 bytes are equal to 0 is used to mean "cache is invalidated".
*/ */
void cached_charset_invalidate(); void cached_charset_invalidate();
bool cached_charset_compare(char *charset); bool cached_charset_compare(char *charset) const;
void transaction_end(THD*);
void cleanup_context(THD *, bool); void cleanup_context(THD *, bool);
void clear_tables_to_lock(); void clear_tables_to_lock();
time_t unsafe_to_stop_at; /*
Used by row-based replication to detect that it should not stop at
this event, but give it a chance to send more events. The time
where the last event inside a group started is stored here. If the
variable is zero, we are not in a group (but may be in a
transaction).
*/
time_t last_event_start_time;
} RELAY_LOG_INFO; } RELAY_LOG_INFO;
......
...@@ -108,7 +108,7 @@ field_length_from_packed(enum_field_types const field_type, ...@@ -108,7 +108,7 @@ field_length_from_packed(enum_field_types const field_type,
*/ */
int int
table_def::compatible_with(RELAY_LOG_INFO *rli, TABLE *table) table_def::compatible_with(RELAY_LOG_INFO const *rli_arg, TABLE *table)
const const
{ {
/* /*
...@@ -116,6 +116,7 @@ table_def::compatible_with(RELAY_LOG_INFO *rli, TABLE *table) ...@@ -116,6 +116,7 @@ table_def::compatible_with(RELAY_LOG_INFO *rli, TABLE *table)
*/ */
uint const cols_to_check= min(table->s->fields, size()); uint const cols_to_check= min(table->s->fields, size());
int error= 0; int error= 0;
RELAY_LOG_INFO const *rli= const_cast<RELAY_LOG_INFO*>(rli_arg);
TABLE_SHARE const *const tsh= table->s; TABLE_SHARE const *const tsh= table->s;
......
...@@ -117,7 +117,7 @@ public: ...@@ -117,7 +117,7 @@ public:
@retval 1 if the table definition is not compatible with @c table @retval 1 if the table definition is not compatible with @c table
@retval 0 if the table definition is compatible with @c table @retval 0 if the table definition is compatible with @c table
*/ */
int compatible_with(RELAY_LOG_INFO *rli, TABLE *table) const; int compatible_with(RELAY_LOG_INFO const *rli, TABLE *table) const;
private: private:
my_size_t m_size; // Number of elements in the types array my_size_t m_size; // Number of elements in the types array
......
This diff is collapsed.
...@@ -162,9 +162,9 @@ bool show_binlog_info(THD* thd); ...@@ -162,9 +162,9 @@ bool show_binlog_info(THD* thd);
bool rpl_master_has_bug(RELAY_LOG_INFO *rli, uint bug_id); bool rpl_master_has_bug(RELAY_LOG_INFO *rli, uint bug_id);
const char *print_slave_db_safe(const char *db); const char *print_slave_db_safe(const char *db);
int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int error_code); int check_expected_error(THD* thd, RELAY_LOG_INFO const *rli, int error_code);
void skip_load_data_infile(NET* net); void skip_load_data_infile(NET* net);
void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli, void slave_print_msg(enum loglevel level, RELAY_LOG_INFO const *rli,
int err_code, const char* msg, ...) int err_code, const char* msg, ...)
ATTRIBUTE_FORMAT(printf, 4, 5); ATTRIBUTE_FORMAT(printf, 4, 5);
...@@ -182,7 +182,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,ulonglong pos, ...@@ -182,7 +182,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,ulonglong pos,
int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset, int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
const char** errmsg); const char** errmsg);
void set_slave_thread_options(THD* thd); void set_slave_thread_options(THD* thd);
void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli); void set_slave_thread_default_charset(THD *thd, RELAY_LOG_INFO const *rli);
void rotate_relay_log(MASTER_INFO* mi); void rotate_relay_log(MASTER_INFO* mi);
pthread_handler_t handle_slave_io(void *arg); pthread_handler_t handle_slave_io(void *arg);
......
...@@ -163,9 +163,17 @@ void mysql_client_binlog_statement(THD* thd) ...@@ -163,9 +163,17 @@ void mysql_client_binlog_statement(THD* thd)
(ulong) uint4korr(bufptr+EVENT_LEN_OFFSET))); (ulong) uint4korr(bufptr+EVENT_LEN_OFFSET)));
#endif #endif
ev->thd= thd; ev->thd= thd;
if (IF_DBUG(int err= ) ev->exec_event(thd->rli_fake)) /*
We go directly to the application phase, since we don't need
to check if the event shall be skipped or not.
Neither do we have to update the log positions, since that is
not used at all: the rli_fake instance is used only for error
reporting.
*/
if (IF_DBUG(int err= ) ev->apply_event(thd->rli_fake))
{ {
DBUG_PRINT("error", ("exec_event() returned: %d", err)); DBUG_PRINT("info", ("apply_event() returned: %d", err));
/* /*
TODO: Maybe a better error message since the BINLOG statement TODO: Maybe a better error message since the BINLOG statement
now contains several events. now contains several events.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment