Commit 575dd775 authored by Nirbhay Choubey's avatar Nirbhay Choubey

MDEV-7867: Add binlog header to GRA_.log file

parent cbc5157f
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
#include "wsrep_priv.h" #include "wsrep_priv.h"
#include "wsrep_binlog.h" // wsrep_dump_rbr_buf() #include "wsrep_binlog.h"
#include "log_event.h" // EVENT_LEN_OFFSET, etc. #include "log_event.h" // EVENT_LEN_OFFSET, etc.
#include "wsrep_applier.h" #include "wsrep_applier.h"
...@@ -62,8 +62,7 @@ err: ...@@ -62,8 +62,7 @@ err:
#include "rpl_rli.h" // class Relay_log_info; #include "rpl_rli.h" // class Relay_log_info;
#include "sql_base.h" // close_temporary_table() #include "sql_base.h" // close_temporary_table()
static inline void void wsrep_set_apply_format(THD* thd, Format_description_log_event* ev)
wsrep_set_apply_format(THD* thd, Format_description_log_event* ev)
{ {
if (thd->wsrep_apply_format) if (thd->wsrep_apply_format)
{ {
...@@ -72,8 +71,7 @@ wsrep_set_apply_format(THD* thd, Format_description_log_event* ev) ...@@ -72,8 +71,7 @@ wsrep_set_apply_format(THD* thd, Format_description_log_event* ev)
thd->wsrep_apply_format= ev; thd->wsrep_apply_format= ev;
} }
static inline Format_description_log_event* Format_description_log_event* wsrep_get_apply_format(THD* thd)
wsrep_get_apply_format(THD* thd)
{ {
if (thd->wsrep_apply_format) if (thd->wsrep_apply_format)
return (Format_description_log_event*) thd->wsrep_apply_format; return (Format_description_log_event*) thd->wsrep_apply_format;
...@@ -260,7 +258,7 @@ wsrep_cb_status_t wsrep_apply_cb(void* const ctx, ...@@ -260,7 +258,7 @@ wsrep_cb_status_t wsrep_apply_cb(void* const ctx,
if (WSREP_CB_SUCCESS != rcode) if (WSREP_CB_SUCCESS != rcode)
{ {
wsrep_dump_rbr_buf(thd, buf, buf_len); wsrep_dump_rbr_buf_with_header(thd, buf, buf_len);
} }
TABLE *tmp; TABLE *tmp;
......
...@@ -18,6 +18,9 @@ ...@@ -18,6 +18,9 @@
#include <sys/types.h> #include <sys/types.h>
void wsrep_set_apply_format(THD* thd, Format_description_log_event* ev);
Format_description_log_event* wsrep_get_apply_format(THD* thd);
/* wsrep callback prototypes */ /* wsrep callback prototypes */
wsrep_cb_status_t wsrep_apply_cb(void *ctx, wsrep_cb_status_t wsrep_apply_cb(void *ctx,
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#include "wsrep_binlog.h" #include "wsrep_binlog.h"
#include "wsrep_priv.h" #include "wsrep_priv.h"
#include "log.h" #include "log.h"
#include "log_event.h"
#include "wsrep_applier.h"
extern handlerton *binlog_hton; extern handlerton *binlog_hton;
/* /*
...@@ -212,7 +214,10 @@ cleanup: ...@@ -212,7 +214,10 @@ cleanup:
WSREP_ERROR("failed to reinitialize io-cache"); WSREP_ERROR("failed to reinitialize io-cache");
} }
if (unlikely(WSREP_OK != err)) wsrep_dump_rbr_buf(thd, buf, used); if (unlikely(WSREP_OK != err))
{
wsrep_dump_rbr_buf_with_header(thd, buf, used);
}
my_free(heap_buf); my_free(heap_buf);
return err; return err;
...@@ -352,6 +357,7 @@ int wsrep_binlog_savepoint_rollback(THD *thd, void *sv) ...@@ -352,6 +357,7 @@ int wsrep_binlog_savepoint_rollback(THD *thd, void *sv)
return rcode; return rcode;
} }
#if 0
void wsrep_dump_rbr_direct(THD* thd, IO_CACHE* cache) void wsrep_dump_rbr_direct(THD* thd, IO_CACHE* cache)
{ {
char filename[PATH_MAX]= {0}; char filename[PATH_MAX]= {0};
...@@ -407,8 +413,62 @@ cleanup: ...@@ -407,8 +413,62 @@ cleanup:
// close file // close file
if (of) fclose(of); if (of) fclose(of);
} }
#endif
void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end) void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end)
{ {
thd->binlog_flush_pending_rows_event(stmt_end); thd->binlog_flush_pending_rows_event(stmt_end);
} }
/* Dump replication buffer along with header to a file. */
void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
size_t buf_len)
{
char filename[PATH_MAX]= {0};
File file;
IO_CACHE cache;
Format_description_log_event *ev= wsrep_get_apply_format(thd);
int len= my_snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld_v2.log",
wsrep_data_home_dir, thd->thread_id,
(long long) wsrep_thd_trx_seqno(thd));
if (len >= PATH_MAX)
{
WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
return;
}
if ((file= mysql_file_open(key_file_wsrep_gra_log, filename,
O_RDWR | O_CREAT | O_BINARY, MYF(MY_WME))) < 0)
{
WSREP_ERROR("Failed to open file '%s' : %d (%s)",
filename, errno, strerror(errno));
goto cleanup1;
}
if (init_io_cache(&cache, file, 0, WRITE_CACHE, 0, 0, MYF(MY_WME | MY_NABP)))
{
mysql_file_close(file, MYF(MY_WME));
goto cleanup2;
}
if (my_b_safe_write(&cache, BINLOG_MAGIC, BIN_LOG_HEADER_SIZE))
{
goto cleanup2;
}
if (ev->write(&cache) || my_b_write(&cache, rbr_buf, buf_len) ||
flush_io_cache(&cache))
{
WSREP_ERROR("Failed to write to '%s'.", filename);
goto cleanup2;
}
cleanup2:
end_io_cache(&cache);
cleanup1:
mysql_file_close(file, MYF(MY_WME));
}
...@@ -49,11 +49,12 @@ void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len); ...@@ -49,11 +49,12 @@ void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len);
/* Dump replication buffer to disk without intermediate buffer */ /* Dump replication buffer to disk without intermediate buffer */
void wsrep_dump_rbr_direct(THD* thd, IO_CACHE* cache); void wsrep_dump_rbr_direct(THD* thd, IO_CACHE* cache);
/* Dump replication buffer along with header to a file */
void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
size_t buf_len);
int wsrep_binlog_close_connection(THD* thd); int wsrep_binlog_close_connection(THD* thd);
int wsrep_binlog_savepoint_set(THD *thd, void *sv); int wsrep_binlog_savepoint_set(THD *thd, void *sv);
int wsrep_binlog_savepoint_rollback(THD *thd, void *sv); int wsrep_binlog_savepoint_rollback(THD *thd, void *sv);
/* Dump replication buffer to disk without intermediate buffer */
void wsrep_dump_rbr_direct(THD* thd, IO_CACHE* cache);
#endif /* WSREP_BINLOG_H */ #endif /* WSREP_BINLOG_H */
...@@ -133,6 +133,8 @@ PSI_cond_key key_COND_wsrep_rollback, ...@@ -133,6 +133,8 @@ PSI_cond_key key_COND_wsrep_rollback,
key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst, key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread; key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread;
PSI_file_key key_file_wsrep_gra_log;
static PSI_mutex_info wsrep_mutexes[]= static PSI_mutex_info wsrep_mutexes[]=
{ {
{ &key_LOCK_wsrep_ready, "LOCK_wsrep_ready", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_ready, "LOCK_wsrep_ready", PSI_FLAG_GLOBAL},
...@@ -157,6 +159,12 @@ static PSI_cond_info wsrep_conds[]= ...@@ -157,6 +159,12 @@ static PSI_cond_info wsrep_conds[]=
{ &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL}, { &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL} { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}
}; };
static PSI_file_info wsrep_files[]=
{
{ &key_file_wsrep_gra_log, "wsrep_gra_log", 0}
};
#else #else
#define mysql_mutex_register(X,Y,Z) #define mysql_mutex_register(X,Y,Z)
#define mysql_cond_register(X,Y,Z) #define mysql_cond_register(X,Y,Z)
...@@ -626,6 +634,8 @@ int wsrep_init() ...@@ -626,6 +634,8 @@ int wsrep_init()
mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST);
mysql_file_register("sql", wsrep_files, array_elements(wsrep_files));
wsrep_ready_set(FALSE); wsrep_ready_set(FALSE);
assert(wsrep_provider); assert(wsrep_provider);
......
...@@ -262,6 +262,8 @@ extern PSI_mutex_key key_LOCK_wsrep_replaying; ...@@ -262,6 +262,8 @@ extern PSI_mutex_key key_LOCK_wsrep_replaying;
extern PSI_cond_key key_COND_wsrep_replaying; extern PSI_cond_key key_COND_wsrep_replaying;
extern PSI_mutex_key key_LOCK_wsrep_slave_threads; extern PSI_mutex_key key_LOCK_wsrep_slave_threads;
extern PSI_mutex_key key_LOCK_wsrep_desync; extern PSI_mutex_key key_LOCK_wsrep_desync;
extern PSI_file_key key_file_wsrep_gra_log;
#endif /* HAVE_PSI_INTERFACE */ #endif /* HAVE_PSI_INTERFACE */
struct TABLE_LIST; struct TABLE_LIST;
int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment