Commit 0b4e65e1 authored by lars@black.(none)'s avatar lars@black.(none)

Merge mysql.com:/home/bkroot/mysql-5.1-new-rpl

into  mysql.com:/home/bk/MERGE/mysql-5.1-merge
parents 84e4df33 1023aecb
...@@ -476,6 +476,30 @@ static bool check_database(const char *log_dbname) ...@@ -476,6 +476,30 @@ static bool check_database(const char *log_dbname)
} }
static int
write_event_header_and_base64(Log_event *ev, FILE *result_file,
PRINT_EVENT_INFO *print_event_info)
{
DBUG_ENTER("write_event_header_and_base64");
/* Write header and base64 output to cache */
IO_CACHE result_cache;
if (init_io_cache(&result_cache, -1, 0, WRITE_CACHE, 0L, FALSE,
MYF(MY_WME | MY_NABP)))
{
return 1;
}
ev->print_header(&result_cache, print_event_info, FALSE);
ev->print_base64(&result_cache, print_event_info, FALSE);
/* Read data from cache and write to result file */
my_b_copy_to_file(&result_cache, result_file);
end_io_cache(&result_cache);
DBUG_RETURN(0);
}
/* /*
Process an event Process an event
...@@ -538,18 +562,18 @@ int process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, ...@@ -538,18 +562,18 @@ int process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
print_event_info->base64_output= opt_base64_output; print_event_info->base64_output= opt_base64_output;
DBUG_PRINT("debug", ("event_type: %s", ev->get_type_str()));
switch (ev_type) { switch (ev_type) {
case QUERY_EVENT: case QUERY_EVENT:
if (check_database(((Query_log_event*)ev)->db)) if (check_database(((Query_log_event*)ev)->db))
goto end; goto end;
if (opt_base64_output) if (opt_base64_output)
{ write_event_header_and_base64(ev, result_file, print_event_info);
ev->print_header(result_file, print_event_info);
ev->print_base64(result_file, print_event_info);
}
else else
ev->print(result_file, print_event_info); ev->print(result_file, print_event_info);
break; break;
case CREATE_FILE_EVENT: case CREATE_FILE_EVENT:
{ {
Create_file_log_event* ce= (Create_file_log_event*)ev; Create_file_log_event* ce= (Create_file_log_event*)ev;
...@@ -570,8 +594,7 @@ int process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, ...@@ -570,8 +594,7 @@ int process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
*/ */
if (opt_base64_output) if (opt_base64_output)
{ {
ce->print_header(result_file, print_event_info); write_event_header_and_base64(ce, result_file, print_event_info);
ce->print_base64(result_file, print_event_info);
} }
else else
ce->print(result_file, print_event_info, TRUE); ce->print(result_file, print_event_info, TRUE);
......
...@@ -39,7 +39,8 @@ int base64_encode(const void *src, size_t src_len, char *dst); ...@@ -39,7 +39,8 @@ int base64_encode(const void *src, size_t src_len, char *dst);
/* /*
Decode a base64 string into data Decode a base64 string into data
*/ */
int base64_decode(const char *src, size_t src_len, void *dst); int base64_decode(const char *src, size_t src_len,
void *dst, const char **end_ptr);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -517,6 +517,7 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *); ...@@ -517,6 +517,7 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *);
(uint) (*(info)->current_pos - (info)->request_pos)) (uint) (*(info)->current_pos - (info)->request_pos))
/* tell write offset in the SEQ_APPEND cache */ /* tell write offset in the SEQ_APPEND cache */
int my_b_copy_to_file(IO_CACHE *cache, FILE *file);
my_off_t my_b_append_tell(IO_CACHE* info); my_off_t my_b_append_tell(IO_CACHE* info);
my_off_t my_b_safe_tell(IO_CACHE* info); /* picks the correct tell() */ my_off_t my_b_safe_tell(IO_CACHE* info); /* picks the correct tell() */
......
...@@ -359,15 +359,6 @@ show binlog events from 102; ...@@ -359,15 +359,6 @@ show binlog events from 102;
Log_name Pos Event_type Server_id End_log_pos Info Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Table_map 1 # table_id: # (test.t1)
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
master-bin.000001 # Query 1 # use `test`; BEGIN
master-bin.000001 # Query 1 # use `test`; CREATE TABLE `t2` (
`a` int(11) NOT NULL DEFAULT '0',
`b` int(11) DEFAULT NULL,
PRIMARY KEY (`a`)
) ENGINE=InnoDB
master-bin.000001 # Table_map 1 # table_id: # (test.t2)
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
master-bin.000001 # Xid 1 # COMMIT /* xid= */
master-bin.000001 # Query 1 # use `test`; DROP TABLE if exists t2 master-bin.000001 # Query 1 # use `test`; DROP TABLE if exists t2
master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Table_map 1 # table_id: # (test.t1)
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
...@@ -375,15 +366,6 @@ master-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS t2 ...@@ -375,15 +366,6 @@ master-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS t2
master-bin.000001 # Query 1 # use `test`; CREATE TABLE t2 (a int, b int, primary key (a)) engine=innodb master-bin.000001 # Query 1 # use `test`; CREATE TABLE t2 (a int, b int, primary key (a)) engine=innodb
master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Table_map 1 # table_id: # (test.t1)
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
master-bin.000001 # Query 1 # use `test`; BEGIN
master-bin.000001 # Query 1 # use `test`; CREATE TABLE `t2` (
`a` int(11) NOT NULL DEFAULT '0',
`b` int(11) DEFAULT NULL,
PRIMARY KEY (`a`)
) ENGINE=InnoDB
master-bin.000001 # Table_map 1 # table_id: # (test.t2)
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
master-bin.000001 # Xid 1 # COMMIT /* xid= */
master-bin.000001 # Query 1 # use `test`; TRUNCATE table t2 master-bin.000001 # Query 1 # use `test`; TRUNCATE table t2
master-bin.000001 # Xid 1 # COMMIT /* xid= */ master-bin.000001 # Xid 1 # COMMIT /* xid= */
master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Table_map 1 # table_id: # (test.t1)
......
...@@ -347,13 +347,13 @@ create view v3 (x,y,z) as select b, a, b from t1; ...@@ -347,13 +347,13 @@ create view v3 (x,y,z) as select b, a, b from t1;
create view v4 (x,y,z) as select c+1, b, a from t1; create view v4 (x,y,z) as select c+1, b, a from t1;
create algorithm=temptable view v5 (x,y,z) as select c, b, a from t1; create algorithm=temptable view v5 (x,y,z) as select c, b, a from t1;
# try insert to VIEW with fields duplicate # try insert to VIEW with fields duplicate
-- error 1573 -- error ER_NON_INSERTABLE_TABLE
insert into v3 values (-60,4,30); insert into v3 values (-60,4,30);
# try insert to VIEW with expression in SELECT list # try insert to VIEW with expression in SELECT list
-- error 1573 -- error ER_NON_INSERTABLE_TABLE
insert into v4 values (-60,4,30); insert into v4 values (-60,4,30);
# try insert to VIEW using temporary table algorithm # try insert to VIEW using temporary table algorithm
-- error 1573 -- error ER_NON_INSERTABLE_TABLE
insert into v5 values (-60,4,30); insert into v5 values (-60,4,30);
insert into v1 values (-60,4,30); insert into v1 values (-60,4,30);
insert into v1 (z,y,x) values (50,6,-100); insert into v1 (z,y,x) values (50,6,-100);
...@@ -375,13 +375,13 @@ create view v3 (x,y,z) as select b, a, b from t1; ...@@ -375,13 +375,13 @@ create view v3 (x,y,z) as select b, a, b from t1;
create view v4 (x,y,z) as select c+1, b, a from t1; create view v4 (x,y,z) as select c+1, b, a from t1;
create algorithm=temptable view v5 (x,y,z) as select c, b, a from t1; create algorithm=temptable view v5 (x,y,z) as select c, b, a from t1;
# try insert to VIEW with fields duplicate # try insert to VIEW with fields duplicate
-- error 1573 -- error ER_NON_INSERTABLE_TABLE
insert into v3 select c, b, a from t2; insert into v3 select c, b, a from t2;
# try insert to VIEW with expression in SELECT list # try insert to VIEW with expression in SELECT list
-- error 1573 -- error ER_NON_INSERTABLE_TABLE
insert into v4 select c, b, a from t2; insert into v4 select c, b, a from t2;
# try insert to VIEW using temporary table algorithm # try insert to VIEW using temporary table algorithm
-- error 1573 -- error ER_NON_INSERTABLE_TABLE
insert into v5 select c, b, a from t2; insert into v5 select c, b, a from t2;
insert into v1 select c, b, a from t2; insert into v1 select c, b, a from t2;
insert into v1 (z,y,x) select a+20,b+2,-100 from t2; insert into v1 (z,y,x) select a+20,b+2,-100 from t2;
...@@ -1249,14 +1249,14 @@ drop table t1; ...@@ -1249,14 +1249,14 @@ drop table t1;
# #
create table t1 (s1 smallint); create table t1 (s1 smallint);
create view v1 as select * from t1 where 20 < (select (s1) from t1); create view v1 as select * from t1 where 20 < (select (s1) from t1);
-- error 1573 -- error ER_NON_INSERTABLE_TABLE
insert into v1 values (30); insert into v1 values (30);
create view v2 as select * from t1; create view v2 as select * from t1;
create view v3 as select * from t1 where 20 < (select (s1) from v2); create view v3 as select * from t1 where 20 < (select (s1) from v2);
-- error 1573 -- error ER_NON_INSERTABLE_TABLE
insert into v3 values (30); insert into v3 values (30);
create view v4 as select * from v2 where 20 < (select (s1) from t1); create view v4 as select * from v2 where 20 < (select (s1) from t1);
-- error 1573 -- error ER_NON_INSERTABLE_TABLE
insert into v4 values (30); insert into v4 values (30);
drop view v4, v3, v2, v1; drop view v4, v3, v2, v1;
drop table t1; drop table t1;
...@@ -2861,7 +2861,7 @@ DROP TABLE t1; ...@@ -2861,7 +2861,7 @@ DROP TABLE t1;
# #
create table t1 (s1 int); create table t1 (s1 int);
create view v1 as select s1 as a, s1 as b from t1; create view v1 as select s1 as a, s1 as b from t1;
--error 1573 --error ER_NON_INSERTABLE_TABLE
insert into v1 values (1,1); insert into v1 values (1,1);
update v1 set a = 5; update v1 set a = 5;
drop view v1; drop view v1;
......
...@@ -125,44 +125,69 @@ pos(unsigned char c) ...@@ -125,44 +125,69 @@ pos(unsigned char c)
/* /*
Decode a base64 string Decode a base64 string
Note: We require that dst is pre-allocated to correct size. SYNOPSIS
See base64_needed_decoded_length(). base64_decode()
src Pointer to base64-encoded string
RETURN Number of bytes produced in dst or -1 in case of failure len Length of string at 'src'
dst Pointer to location where decoded data will be stored
end_ptr Pointer to variable that will refer to the character
after the end of the encoded data that were decoded. Can
be NULL.
DESCRIPTION
The base64-encoded data in the range ['src','*end_ptr') will be
decoded and stored starting at 'dst'. The decoding will stop
after 'len' characters have been read from 'src', or when padding
occurs in the base64-encoded data. In either case: if 'end_ptr' is
non-null, '*end_ptr' will be set to point to the character after
the last read character, even in the presence of error.
NOTE
We require that 'dst' is pre-allocated to correct size.
SEE ALSO
base64_needed_decoded_length().
RETURN VALUE
Number of bytes written at 'dst' or -1 in case of failure
*/ */
int int
base64_decode(const char *src, size_t size, void *dst) base64_decode(const char *const src_base, size_t const len,
void *dst, const char **end_ptr)
{ {
char b[3]; char b[3];
size_t i= 0; size_t i= 0;
char *dst_base= (char *)dst; char *dst_base= (char *)dst;
char const *src= src_base;
char *d= dst_base; char *d= dst_base;
size_t j; size_t j;
while (i < size) while (i < len)
{ {
unsigned c= 0; unsigned c= 0;
size_t mark= 0; size_t mark= 0;
SKIP_SPACE(src, i, size); SKIP_SPACE(src, i, len);
c += pos(*src++); c += pos(*src++);
c <<= 6; c <<= 6;
i++; i++;
SKIP_SPACE(src, i, size); SKIP_SPACE(src, i, len);
c += pos(*src++); c += pos(*src++);
c <<= 6; c <<= 6;
i++; i++;
SKIP_SPACE(src, i, size); SKIP_SPACE(src, i, len);
if (* src != '=') if (*src != '=')
c += pos(*src++); c += pos(*src++);
else else
{ {
i= size; src += 2; /* There should be two bytes padding */
i= len;
mark= 2; mark= 2;
c <<= 6; c <<= 6;
goto end; goto end;
...@@ -170,13 +195,14 @@ base64_decode(const char *src, size_t size, void *dst) ...@@ -170,13 +195,14 @@ base64_decode(const char *src, size_t size, void *dst)
c <<= 6; c <<= 6;
i++; i++;
SKIP_SPACE(src, i, size); SKIP_SPACE(src, i, len);
if (*src != '=') if (*src != '=')
c += pos(*src++); c += pos(*src++);
else else
{ {
i= size; src += 1; /* There should be one byte padding */
i= len;
mark= 1; mark= 1;
goto end; goto end;
} }
...@@ -191,11 +217,14 @@ base64_decode(const char *src, size_t size, void *dst) ...@@ -191,11 +217,14 @@ base64_decode(const char *src, size_t size, void *dst)
*d++= b[j]; *d++= b[j];
} }
if (i != size) if (end_ptr != NULL)
{ *end_ptr= src;
return -1;
} /*
return d - dst_base; The variable 'i' is set to 'len' when padding has been read, so it
does not actually reflect the number of bytes read from 'src'.
*/
return i != len ? -1 : d - dst_base;
} }
......
...@@ -24,6 +24,54 @@ ...@@ -24,6 +24,54 @@
#include <stdarg.h> #include <stdarg.h>
#include <m_ctype.h> #include <m_ctype.h>
/*
Copy contents of an IO_CACHE to a file.
SYNOPSIS
my_b_copy_to_file()
cache IO_CACHE to copy from
file File to copy to
DESCRIPTION
Copy the contents of the cache to the file. The cache will be
re-inited to a read cache and will read from the beginning of the
cache.
If a failure to write fully occurs, the cache is only copied
partially.
TODO
Make this function solid by handling partial reads from the cache
in a correct manner: it should be atomic.
RETURN VALUE
0 All OK
1 An error occured
*/
int
my_b_copy_to_file(IO_CACHE *cache, FILE *file)
{
byte buf[IO_SIZE];
uint bytes_in_cache;
DBUG_ENTER("my_b_copy_to_file");
/* Reinit the cache to read from the beginning of the cache */
if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
DBUG_RETURN(1);
bytes_in_cache= my_b_bytes_in_cache(cache);
while (bytes_in_cache > 0) {
uint const read_bytes= min(bytes_in_cache, sizeof(buf));
DBUG_PRINT("debug", ("Remaining %u bytes - Reading %u bytes",
bytes_in_cache, read_bytes));
if (my_b_read(cache, buf, read_bytes))
DBUG_RETURN(1);
if (my_fwrite(file, buf, read_bytes, MYF(MY_WME | MY_NABP)) == (uint) -1)
DBUG_RETURN(1);
bytes_in_cache -= read_bytes;
}
DBUG_RETURN(0);
}
my_off_t my_b_append_tell(IO_CACHE* info) my_off_t my_b_append_tell(IO_CACHE* info)
{ {
/* /*
......
...@@ -32,10 +32,22 @@ ...@@ -32,10 +32,22 @@
#include <mysql/plugin.h> #include <mysql/plugin.h>
/*
Define placement versions of operator new and operator delete since
we cannot be sure that the <new> include exists.
*/
inline void *operator new(size_t, void *ptr) { return ptr; }
inline void *operator new[](size_t, void *ptr) { return ptr; }
inline void operator delete(void*, void*) { /* Do nothing */ }
inline void operator delete[](void*, void*) { /* Do nothing */ }
/* max size of the log message */ /* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024 #define MAX_LOG_BUFFER_SIZE 1024
#define MAX_USER_HOST_SIZE 512 #define MAX_USER_HOST_SIZE 512
#define MAX_TIME_SIZE 32 #define MAX_TIME_SIZE 32
#define MY_OFF_T_UNDEF (~(my_off_t)0UL)
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
LOGGER logger; LOGGER logger;
...@@ -70,23 +82,92 @@ char *make_default_log_name(char *buff,const char* log_ext) ...@@ -70,23 +82,92 @@ char *make_default_log_name(char *buff,const char* log_ext)
} }
/* /*
This is a POD. Please keep it that way! Helper class to store binary log transaction data.
Don't add constructors, destructors, or virtual functions.
*/ */
struct binlog_trx_data { class binlog_trx_data {
public:
binlog_trx_data()
#ifdef HAVE_ROW_BASED_REPLICATION
: m_pending(0), before_stmt_pos(MY_OFF_T_UNDEF)
#endif
{
trans_log.end_of_file= max_binlog_cache_size;
}
~binlog_trx_data()
{
#ifdef HAVE_ROW_BASED_REPLICATION
DBUG_ASSERT(pending() == NULL);
#endif
close_cached_file(&trans_log);
}
my_off_t position() const {
return my_b_tell(&trans_log);
}
bool empty() const bool empty() const
{ {
#ifdef HAVE_ROW_BASED_REPLICATION #ifdef HAVE_ROW_BASED_REPLICATION
return pending == NULL && my_b_tell(&trans_log) == 0; return pending() == NULL && my_b_tell(&trans_log) == 0;
#else #else
return my_b_tell(&trans_log) == 0; return my_b_tell(&trans_log) == 0;
#endif #endif
} }
binlog_trx_data() {}
/*
Truncate the transaction cache to a certain position. This
includes deleting the pending event.
*/
void truncate(my_off_t pos)
{
#ifdef HAVE_ROW_BASED_REPLICATION
delete pending();
set_pending(0);
#endif
reinit_io_cache(&trans_log, WRITE_CACHE, pos, 0, 0);
}
/*
Reset the entire contents of the transaction cache, emptying it
completely.
*/
void reset() {
if (!empty())
truncate(0);
#ifdef HAVE_ROW_BASED_REPLICATION
before_stmt_pos= MY_OFF_T_UNDEF;
#endif
trans_log.end_of_file= max_binlog_cache_size;
}
#ifdef HAVE_ROW_BASED_REPLICATION
Rows_log_event *pending() const
{
return m_pending;
}
void set_pending(Rows_log_event *const pending)
{
m_pending= pending;
}
#endif
IO_CACHE trans_log; // The transaction cache IO_CACHE trans_log; // The transaction cache
private:
#ifdef HAVE_ROW_BASED_REPLICATION #ifdef HAVE_ROW_BASED_REPLICATION
Rows_log_event *pending; // The pending binrows event /*
Pending binrows event. This event is the event where the rows are
currently written.
*/
Rows_log_event *m_pending;
public:
/*
Binlog position before the start of the current statement.
*/
my_off_t before_stmt_pos;
#endif #endif
}; };
...@@ -1149,6 +1230,69 @@ void Log_to_csv_event_handler:: ...@@ -1149,6 +1230,69 @@ void Log_to_csv_event_handler::
} }
/*
Save position of binary log transaction cache.
SYNPOSIS
binlog_trans_log_savepos()
thd The thread to take the binlog data from
pos Pointer to variable where the position will be stored
DESCRIPTION
Save the current position in the binary log transaction cache into
the variable pointed to by 'pos'
*/
static void
binlog_trans_log_savepos(THD *thd, my_off_t *pos)
{
DBUG_ENTER("binlog_trans_log_savepos");
DBUG_ASSERT(pos != NULL);
if (thd->ha_data[binlog_hton->slot] == NULL)
thd->binlog_setup_trx_data();
binlog_trx_data *const trx_data=
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
DBUG_ASSERT(mysql_bin_log.is_open());
*pos= trx_data->position();
DBUG_PRINT("return", ("*pos=%u", *pos));
DBUG_VOID_RETURN;
}
/*
Truncate the binary log transaction cache.
SYNPOSIS
binlog_trans_log_truncate()
thd The thread to take the binlog data from
pos Position to truncate to
DESCRIPTION
Truncate the binary log to the given position. Will not change
anything else.
*/
static void
binlog_trans_log_truncate(THD *thd, my_off_t pos)
{
DBUG_ENTER("binlog_trans_log_truncate");
DBUG_PRINT("enter", ("pos=%u", pos));
DBUG_ASSERT(thd->ha_data[binlog_hton->slot] != NULL);
/* Only true if binlog_trans_log_savepos() wasn't called before */
DBUG_ASSERT(pos != ~(my_off_t) 0);
binlog_trx_data *const trx_data=
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
trx_data->truncate(pos);
DBUG_VOID_RETURN;
}
/* /*
this function is mostly a placeholder. this function is mostly a placeholder.
conceptually, binlog initialization (now mostly done in MYSQL_BIN_LOG::open) conceptually, binlog initialization (now mostly done in MYSQL_BIN_LOG::open)
...@@ -1175,27 +1319,62 @@ static int binlog_close_connection(handlerton *hton, THD *thd) ...@@ -1175,27 +1319,62 @@ static int binlog_close_connection(handlerton *hton, THD *thd)
{ {
binlog_trx_data *const trx_data= binlog_trx_data *const trx_data=
(binlog_trx_data*) thd->ha_data[binlog_hton->slot]; (binlog_trx_data*) thd->ha_data[binlog_hton->slot];
IO_CACHE *trans_log= &trx_data->trans_log;
DBUG_ASSERT(mysql_bin_log.is_open() && trx_data->empty()); DBUG_ASSERT(mysql_bin_log.is_open() && trx_data->empty());
close_cached_file(trans_log);
thd->ha_data[binlog_hton->slot]= 0; thd->ha_data[binlog_hton->slot]= 0;
trx_data->~binlog_trx_data();
my_free((gptr)trx_data, MYF(0)); my_free((gptr)trx_data, MYF(0));
return 0; return 0;
} }
/*
End a transaction.
SYNOPSIS
binlog_end_trans()
thd The thread whose transaction should be ended
trx_data Pointer to the transaction data to use
end_ev The end event to use, or NULL
all True if the entire transaction should be ended, false if
only the statement transaction should be ended.
DESCRIPTION
End the currently open transaction. The transaction can be either
a real transaction (if 'all' is true) or a statement transaction
(if 'all' is false).
If 'end_ev' is NULL, the transaction is a rollback of only
transactional tables, so the transaction cache will be truncated
to either just before the last opened statement transaction (if
'all' is false), or reset completely (if 'all' is true).
*/
static int static int
binlog_end_trans(THD *thd, binlog_trx_data *trx_data, binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
Log_event *end_ev) Log_event *end_ev, bool all)
{ {
DBUG_ENTER("binlog_end_trans"); DBUG_ENTER("binlog_end_trans");
int error=0; int error=0;
IO_CACHE *trans_log= &trx_data->trans_log; IO_CACHE *trans_log= &trx_data->trans_log;
DBUG_PRINT("enter", ("transaction: %s, end_ev=%p",
all ? "all" : "stmt", end_ev));
DBUG_PRINT("info", ("thd->options={ %s%s}",
FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
FLAGSTR(thd->options, OPTION_BEGIN)));
/*
/* NULL denotes ROLLBACK with nothing to replicate */ NULL denotes ROLLBACK with nothing to replicate: i.e., rollback of
only transactional tables. If the transaction contain changes to
any non-transactiona tables, we need write the transaction and log
a ROLLBACK last.
*/
if (end_ev != NULL) if (end_ev != NULL)
{ {
/* /*
Doing a commit or a rollback including non-transactional tables,
i.e., ending a transaction where we might write the transaction
cache to the binary log.
We can always end the statement when ending a transaction since We can always end the statement when ending a transaction since
transactions are not allowed inside stored functions. If they transactions are not allowed inside stored functions. If they
were, we would have to ensure that we're not ending a statement were, we would have to ensure that we're not ending a statement
...@@ -1204,38 +1383,55 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data, ...@@ -1204,38 +1383,55 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
#ifdef HAVE_ROW_BASED_REPLICATION #ifdef HAVE_ROW_BASED_REPLICATION
thd->binlog_flush_pending_rows_event(TRUE); thd->binlog_flush_pending_rows_event(TRUE);
#endif #endif
error= mysql_bin_log.write(thd, trans_log, end_ev); /*
} We write the transaction cache to the binary log if either we're
#ifdef HAVE_ROW_BASED_REPLICATION committing the entire transaction, or if we are doing an
else autocommit outside a transaction.
*/
if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
{ {
error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev);
trx_data->reset();
#ifdef HAVE_ROW_BASED_REPLICATION #ifdef HAVE_ROW_BASED_REPLICATION
thd->binlog_delete_pending_rows_event();
#endif
}
/* /*
We need to step the table map version both after writing the
entire transaction to the log file and after rolling back the
transaction.
We need to step the table map version after writing the We need to step the table map version after writing the
transaction cache to disk. In addition, we need to step the table transaction cache to disk.
map version on a rollback to ensure that a new table map event is
generated instead of the one that was written to the thrown-away
transaction cache.
*/ */
mysql_bin_log.update_table_map_version(); mysql_bin_log.update_table_map_version();
#endif #endif
statistic_increment(binlog_cache_use, &LOCK_status); statistic_increment(binlog_cache_use, &LOCK_status);
if (trans_log->disk_writes != 0) if (trans_log->disk_writes != 0)
{ {
statistic_increment(binlog_cache_disk_use, &LOCK_status); statistic_increment(binlog_cache_disk_use, &LOCK_status);
trans_log->disk_writes= 0; trans_log->disk_writes= 0;
} }
reinit_io_cache(trans_log, WRITE_CACHE, (my_off_t) 0, 0, 1); // cannot fail }
trans_log->end_of_file= max_binlog_cache_size; }
#ifdef HAVE_ROW_BASED_REPLICATION
else
{
/*
If rolling back an entire transaction or a single statement not
inside a transaction, we reset the transaction cache.
If rolling back a statement in a transaction, we truncate the
transaction cache to remove the statement.
*/
if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
trx_data->reset();
else
trx_data->truncate(trx_data->before_stmt_pos); // ...statement
/*
We need to step the table map version on a rollback to ensure
that a new table map event is generated instead of the one that
was written to the thrown-away transaction cache.
*/
mysql_bin_log.update_table_map_version();
}
#endif
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -1252,26 +1448,31 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all) ...@@ -1252,26 +1448,31 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all)
static int binlog_commit(handlerton *hton, THD *thd, bool all) static int binlog_commit(handlerton *hton, THD *thd, bool all)
{ {
int error= 0;
DBUG_ENTER("binlog_commit"); DBUG_ENTER("binlog_commit");
binlog_trx_data *const trx_data= binlog_trx_data *const trx_data=
(binlog_trx_data*) thd->ha_data[binlog_hton->slot]; (binlog_trx_data*) thd->ha_data[binlog_hton->slot];
IO_CACHE *trans_log= &trx_data->trans_log; IO_CACHE *trans_log= &trx_data->trans_log;
DBUG_ASSERT(mysql_bin_log.is_open() && DBUG_ASSERT(mysql_bin_log.is_open());
(all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))));
if (trx_data->empty()) if (all && trx_data->empty())
{ {
// we're here because trans_log was flushed in MYSQL_BIN_LOG::log() // we're here because trans_log was flushed in MYSQL_BIN_LOG::log()
trx_data->reset();
DBUG_RETURN(0); DBUG_RETURN(0);
} }
if (all) if (all)
{ {
Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE); Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE);
qev.error_code= 0; // see comment in MYSQL_LOG::write(THD, IO_CACHE) qev.error_code= 0; // see comment in MYSQL_LOG::write(THD, IO_CACHE)
DBUG_RETURN(binlog_end_trans(thd, trx_data, &qev)); int error= binlog_end_trans(thd, trx_data, &qev, all);
DBUG_RETURN(error);
} }
else else
DBUG_RETURN(binlog_end_trans(thd, trx_data, &invisible_commit)); {
int error= binlog_end_trans(thd, trx_data, &invisible_commit, all);
DBUG_RETURN(error);
}
} }
static int binlog_rollback(handlerton *hton, THD *thd, bool all) static int binlog_rollback(handlerton *hton, THD *thd, bool all)
...@@ -1281,13 +1482,13 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) ...@@ -1281,13 +1482,13 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
binlog_trx_data *const trx_data= binlog_trx_data *const trx_data=
(binlog_trx_data*) thd->ha_data[binlog_hton->slot]; (binlog_trx_data*) thd->ha_data[binlog_hton->slot];
IO_CACHE *trans_log= &trx_data->trans_log; IO_CACHE *trans_log= &trx_data->trans_log;
/* DBUG_ASSERT(mysql_bin_log.is_open());
First assert is guaranteed - see trans_register_ha() call below.
The second must be true. If it is not, we're registering if (trx_data->empty()) {
unnecessary, doing extra work. The cause should be found and eliminated trx_data->reset();
*/ DBUG_RETURN(0);
DBUG_ASSERT(all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))); }
DBUG_ASSERT(mysql_bin_log.is_open() && !trx_data->empty());
/* /*
Update the binary log with a BEGIN/ROLLBACK block if we have Update the binary log with a BEGIN/ROLLBACK block if we have
cached some queries and we updated some non-transactional cached some queries and we updated some non-transactional
...@@ -1299,10 +1500,10 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) ...@@ -1299,10 +1500,10 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
{ {
Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, FALSE); Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, FALSE);
qev.error_code= 0; // see comment in MYSQL_LOG::write(THD, IO_CACHE) qev.error_code= 0; // see comment in MYSQL_LOG::write(THD, IO_CACHE)
error= binlog_end_trans(thd, trx_data, &qev); error= binlog_end_trans(thd, trx_data, &qev, all);
} }
else else
error= binlog_end_trans(thd, trx_data, 0); error= binlog_end_trans(thd, trx_data, 0, all);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -1330,11 +1531,8 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) ...@@ -1330,11 +1531,8 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv) static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
{ {
DBUG_ENTER("binlog_savepoint_set"); DBUG_ENTER("binlog_savepoint_set");
binlog_trx_data *const trx_data=
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(&trx_data->trans_log));
*(my_off_t *)sv= my_b_tell(&trx_data->trans_log); binlog_trans_log_savepos(thd, (my_off_t*) sv);
/* Write it to the binary log */ /* Write it to the binary log */
int const error= int const error=
...@@ -1349,7 +1547,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) ...@@ -1349,7 +1547,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
binlog_trx_data *const trx_data= binlog_trx_data *const trx_data=
(binlog_trx_data*) thd->ha_data[binlog_hton->slot]; (binlog_trx_data*) thd->ha_data[binlog_hton->slot];
IO_CACHE *trans_log= &trx_data->trans_log; IO_CACHE *trans_log= &trx_data->trans_log;
DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(trans_log)); DBUG_ASSERT(mysql_bin_log.is_open());
/* /*
Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some
...@@ -1364,7 +1562,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) ...@@ -1364,7 +1562,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
thd->query, thd->query_length, TRUE, FALSE); thd->query, thd->query_length, TRUE, FALSE);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
reinit_io_cache(trans_log, WRITE_CACHE, *(my_off_t *)sv, 0, 0); binlog_trans_log_truncate(thd, *(my_off_t*)sv);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -2495,7 +2693,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) ...@@ -2495,7 +2693,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
thread. If the transaction involved MyISAM tables, it should go thread. If the transaction involved MyISAM tables, it should go
into binlog even on rollback. into binlog even on rollback.
*/ */
(void) pthread_mutex_lock(&LOCK_thread_count); VOID(pthread_mutex_lock(&LOCK_thread_count));
/* Save variables so that we can reopen the log */ /* Save variables so that we can reopen the log */
save_name=name; save_name=name;
...@@ -2527,7 +2725,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) ...@@ -2527,7 +2725,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
my_free((gptr) save_name, MYF(0)); my_free((gptr) save_name, MYF(0));
err: err:
(void) pthread_mutex_unlock(&LOCK_thread_count); VOID(pthread_mutex_unlock(&LOCK_thread_count));
pthread_mutex_unlock(&LOCK_index); pthread_mutex_unlock(&LOCK_index);
pthread_mutex_unlock(&LOCK_log); pthread_mutex_unlock(&LOCK_log);
DBUG_RETURN(error); DBUG_RETURN(error);
...@@ -3093,18 +3291,76 @@ int THD::binlog_setup_trx_data() ...@@ -3093,18 +3291,76 @@ int THD::binlog_setup_trx_data()
ha_data[binlog_hton->slot]= 0; ha_data[binlog_hton->slot]= 0;
DBUG_RETURN(1); // Didn't manage to set it up DBUG_RETURN(1); // Didn't manage to set it up
} }
trx_data->trans_log.end_of_file= max_binlog_cache_size;
trx_data= new (ha_data[binlog_hton->slot]) binlog_trx_data;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
#ifdef HAVE_ROW_BASED_REPLICATION
/* /*
Write a table map to the binary log. Function to start a statement and optionally a transaction for the
binary log.
This function is called from ha_external_lock() after the storage SYNOPSIS
engine has registered for the transaction. binlog_start_trans_and_stmt()
DESCRIPTION
This function does three things:
- Start a transaction if not in autocommit mode or if a BEGIN
statement has been seen.
- Start a statement transaction to allow us to truncate the binary
log.
- Save the currrent binlog position so that we can roll back the
statement by truncating the transaction log.
We only update the saved position if the old one was undefined,
the reason is that there are some cases (e.g., for CREATE-SELECT)
where the position is saved twice (e.g., both in
select_create::prepare() and THD::binlog_write_table_map()) , but
we should use the first. This means that calls to this function
can be used to start the statement before the first table map
event, to include some extra events.
*/
void
THD::binlog_start_trans_and_stmt()
{
DBUG_ENTER("binlog_start_trans_and_stmt");
binlog_trx_data *trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot];
DBUG_PRINT("enter", ("trx_data=0x%lu", trx_data));
if (trx_data)
DBUG_PRINT("enter", ("trx_data->before_stmt_pos=%u",
trx_data->before_stmt_pos));
if (trx_data == NULL ||
trx_data->before_stmt_pos == MY_OFF_T_UNDEF)
{
/*
The call to binlog_trans_log_savepos() might create the trx_data
structure, if it didn't exist before, so we save the position
into an auto variable and then write it into the transaction
data for the binary log (i.e., trx_data).
*/
my_off_t pos= 0;
binlog_trans_log_savepos(this, &pos);
trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot];
trx_data->before_stmt_pos= pos;
if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trans_register_ha(this, TRUE, binlog_hton);
trans_register_ha(this, FALSE, binlog_hton);
}
DBUG_VOID_RETURN;
}
/*
Write a table map to the binary log.
*/ */
#ifdef HAVE_ROW_BASED_REPLICATION
int THD::binlog_write_table_map(TABLE *table, bool is_trans) int THD::binlog_write_table_map(TABLE *table, bool is_trans)
{ {
int error; int error;
...@@ -3123,10 +3379,8 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans) ...@@ -3123,10 +3379,8 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans)
Table_map_log_event Table_map_log_event
the_event(this, table, table->s->table_map_id, is_trans, flags); the_event(this, table, table->s->table_map_id, is_trans, flags);
if (is_trans) if (is_trans && binlog_table_maps == 0)
trans_register_ha(this, binlog_start_trans_and_stmt();
(options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) != 0,
binlog_hton);
if ((error= mysql_bin_log.write(&the_event))) if ((error= mysql_bin_log.write(&the_event)))
DBUG_RETURN(error); DBUG_RETURN(error);
...@@ -3147,7 +3401,7 @@ THD::binlog_get_pending_rows_event() const ...@@ -3147,7 +3401,7 @@ THD::binlog_get_pending_rows_event() const
(since the trx_data is set up there). In that case, we just return (since the trx_data is set up there). In that case, we just return
NULL. NULL.
*/ */
return trx_data ? trx_data->pending : NULL; return trx_data ? trx_data->pending() : NULL;
} }
void void
...@@ -3160,7 +3414,7 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev) ...@@ -3160,7 +3414,7 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev)
(binlog_trx_data*) ha_data[binlog_hton->slot]; (binlog_trx_data*) ha_data[binlog_hton->slot];
DBUG_ASSERT(trx_data); DBUG_ASSERT(trx_data);
trx_data->pending= ev; trx_data->set_pending(ev);
} }
...@@ -3169,8 +3423,9 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev) ...@@ -3169,8 +3423,9 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev)
(either cached binlog if transaction, or disk binlog). Sets a new pending (either cached binlog if transaction, or disk binlog). Sets a new pending
event. event.
*/ */
int MYSQL_BIN_LOG:: int
flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event) MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
Rows_log_event* event)
{ {
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
DBUG_ASSERT(mysql_bin_log.is_open()); DBUG_ASSERT(mysql_bin_log.is_open());
...@@ -3183,9 +3438,9 @@ int MYSQL_BIN_LOG:: ...@@ -3183,9 +3438,9 @@ int MYSQL_BIN_LOG::
DBUG_ASSERT(trx_data); DBUG_ASSERT(trx_data);
DBUG_PRINT("info", ("trx_data->pending=%p", trx_data->pending)); DBUG_PRINT("info", ("trx_data->pending()=%p", trx_data->pending()));
if (Rows_log_event* pending= trx_data->pending) if (Rows_log_event* pending= trx_data->pending())
{ {
IO_CACHE *file= &log_file; IO_CACHE *file= &log_file;
...@@ -3335,15 +3590,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) ...@@ -3335,15 +3590,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
binlog_trx_data *const trx_data= binlog_trx_data *const trx_data=
(binlog_trx_data*) thd->ha_data[binlog_hton->slot]; (binlog_trx_data*) thd->ha_data[binlog_hton->slot];
IO_CACHE *trans_log= &trx_data->trans_log; IO_CACHE *trans_log= &trx_data->trans_log;
bool trans_log_in_use= my_b_tell(trans_log) != 0; my_off_t trans_log_pos= my_b_tell(trans_log);
if (event_info->get_cache_stmt() && !trans_log_in_use) if (event_info->get_cache_stmt() || trans_log_pos != 0)
trans_register_ha(thd, {
(thd->options & DBUG_PRINT("info", ("Using trans_log: cache=%d, trans_log_pos=%u",
(OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) != 0, event_info->get_cache_stmt(),
binlog_hton); trans_log_pos));
if (event_info->get_cache_stmt() || trans_log_in_use) if (trans_log_pos == 0)
{ thd->binlog_start_trans_and_stmt();
DBUG_PRINT("info", ("Using trans_log"));
file= trans_log; file= trans_log;
} }
/* /*
...@@ -3546,6 +3800,12 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event) ...@@ -3546,6 +3800,12 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event)
{ {
uint length; uint length;
/*
We only bother to write to the binary log if there is anything
to write.
*/
if (my_b_tell(cache) > 0)
{
/* /*
Log "BEGIN" at the beginning of the transaction. Log "BEGIN" at the beginning of the transaction.
which may contain more than 1 SQL statement. which may contain more than 1 SQL statement.
...@@ -3587,10 +3847,10 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event) ...@@ -3587,10 +3847,10 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event)
DBUG_EXECUTE_IF("half_binlogged_transaction", goto DBUG_skip_commit;); DBUG_EXECUTE_IF("half_binlogged_transaction", goto DBUG_skip_commit;);
} while ((length=my_b_fill(cache))); } while ((length=my_b_fill(cache)));
if (commit_event->write(&log_file)) if (commit_event && commit_event->write(&log_file))
goto err; goto err;
#ifndef DBUG_OFF #ifndef DBUG_OFF
DBUG_skip_commit: DBUG_skip_commit:
#endif #endif
if (flush_and_sync()) if (flush_and_sync())
goto err; goto err;
...@@ -3602,6 +3862,8 @@ DBUG_skip_commit: ...@@ -3602,6 +3862,8 @@ DBUG_skip_commit:
goto err; goto err;
} }
signal_update(); signal_update();
}
/* /*
if commit_event is Xid_log_event, increase the number of if commit_event is Xid_log_event, increase the number of
prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated
...@@ -3610,7 +3872,7 @@ DBUG_skip_commit: ...@@ -3610,7 +3872,7 @@ DBUG_skip_commit:
If the commit_event is not Xid_log_event (then it's a Query_log_event) If the commit_event is not Xid_log_event (then it's a Query_log_event)
rotate binlog, if necessary. rotate binlog, if necessary.
*/ */
if (commit_event->get_type_code() == XID_EVENT) if (commit_event && commit_event->get_type_code() == XID_EVENT)
{ {
pthread_mutex_lock(&LOCK_prep_xids); pthread_mutex_lock(&LOCK_prep_xids);
prepared_xids++; prepared_xids++;
...@@ -4620,12 +4882,17 @@ int TC_LOG_BINLOG::log(THD *thd, my_xid xid) ...@@ -4620,12 +4882,17 @@ int TC_LOG_BINLOG::log(THD *thd, my_xid xid)
Xid_log_event xle(thd, xid); Xid_log_event xle(thd, xid);
binlog_trx_data *trx_data= binlog_trx_data *trx_data=
(binlog_trx_data*) thd->ha_data[binlog_hton->slot]; (binlog_trx_data*) thd->ha_data[binlog_hton->slot];
DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle)); // invert return value /*
We always commit the entire transaction when writing an XID. Also
note that the return value is inverted.
*/
DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle, TRUE));
} }
void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
{ {
pthread_mutex_lock(&LOCK_prep_xids); pthread_mutex_lock(&LOCK_prep_xids);
DBUG_ASSERT(prepared_xids > 0);
if (--prepared_xids == 0) if (--prepared_xids == 0)
pthread_cond_signal(&COND_prep_xids); pthread_cond_signal(&COND_prep_xids);
pthread_mutex_unlock(&LOCK_prep_xids); pthread_mutex_unlock(&LOCK_prep_xids);
......
...@@ -32,32 +32,111 @@ ...@@ -32,32 +32,111 @@
#define log_cs &my_charset_latin1 #define log_cs &my_charset_latin1
/*
Cache that will automatically be written to a dedicated file on
destruction.
DESCRIPTION
*/
class Write_on_release_cache
{
public:
enum flag
{
FLUSH_F
};
typedef unsigned short flag_set;
/*
Constructor.
SYNOPSIS
Write_on_release_cache
cache Pointer to cache to use
file File to write cache to upon destruction
flags Flags for the cache
DESCRIPTION
Class used to guarantee copy of cache to file before exiting the
current block. On successful copy of the cache, the cache will
be reinited as a WRITE_CACHE.
Currently, a pointer to the cache is provided in the
constructor, but it would be possible to create a subclass
holding the IO_CACHE itself.
*/
Write_on_release_cache(IO_CACHE *cache, FILE *file, flag_set flags = 0)
: m_cache(cache), m_file(file), m_flags(flags)
{
reinit_io_cache(m_cache, WRITE_CACHE, 0L, FALSE, TRUE);
}
~Write_on_release_cache()
{
if (!my_b_copy_to_file(m_cache, m_file))
reinit_io_cache(m_cache, WRITE_CACHE, 0L, FALSE, TRUE);
if (m_flags | FLUSH_F)
fflush(m_file);
}
/*
Return a pointer to the internal IO_CACHE.
SYNOPSIS
operator&()
DESCRIPTION
Function to return a pointer to the internal, so that the object
can be treated as a IO_CACHE and used with the my_b_* IO_CACHE
functions
RETURN VALUE
A pointer to the internal IO_CACHE.
*/
IO_CACHE *operator&()
{
return m_cache;
}
private:
// Hidden, to prevent usage.
Write_on_release_cache(Write_on_release_cache const&);
IO_CACHE *m_cache;
FILE *m_file;
flag_set m_flags;
};
/* /*
pretty_print_str() pretty_print_str()
*/ */
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
static void pretty_print_str(FILE* file, char* str, int len) static void pretty_print_str(IO_CACHE* cache, char* str, int len)
{ {
char* end = str + len; char* end = str + len;
fputc('\'', file); my_b_printf(cache, "\'");
while (str < end) while (str < end)
{ {
char c; char c;
switch ((c=*str++)) { switch ((c=*str++)) {
case '\n': fprintf(file, "\\n"); break; case '\n': my_b_printf(cache, "\\n"); break;
case '\r': fprintf(file, "\\r"); break; case '\r': my_b_printf(cache, "\\r"); break;
case '\\': fprintf(file, "\\\\"); break; case '\\': my_b_printf(cache, "\\\\"); break;
case '\b': fprintf(file, "\\b"); break; case '\b': my_b_printf(cache, "\\b"); break;
case '\t': fprintf(file, "\\t"); break; case '\t': my_b_printf(cache, "\\t"); break;
case '\'': fprintf(file, "\\'"); break; case '\'': my_b_printf(cache, "\\'"); break;
case 0 : fprintf(file, "\\0"); break; case 0 : my_b_printf(cache, "\\0"); break;
default: default:
fputc(c, file); my_b_printf(cache, "%c", c);
break; break;
} }
} }
fputc('\'', file); my_b_printf(cache, "\'");
} }
#endif /* MYSQL_CLIENT */ #endif /* MYSQL_CLIENT */
...@@ -294,14 +373,15 @@ append_query_string(CHARSET_INFO *csinfo, ...@@ -294,14 +373,15 @@ append_query_string(CHARSET_INFO *csinfo,
*/ */
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
static void print_set_option(FILE* file, uint32 bits_changed, uint32 option, static void print_set_option(IO_CACHE* file, uint32 bits_changed,
uint32 flags, const char* name, bool* need_comma) uint32 option, uint32 flags, const char* name,
bool* need_comma)
{ {
if (bits_changed & option) if (bits_changed & option)
{ {
if (*need_comma) if (*need_comma)
fprintf(file,", "); my_b_printf(file,", ");
fprintf(file,"%s=%d", name, test(flags & option)); my_b_printf(file,"%s=%d", name, test(flags & option));
*need_comma= 1; *need_comma= 1;
} }
} }
...@@ -960,20 +1040,23 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, ...@@ -960,20 +1040,23 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
Log_event::print_header() Log_event::print_header()
*/ */
void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) void Log_event::print_header(IO_CACHE* file,
PRINT_EVENT_INFO* print_event_info,
bool is_more __attribute__((unused)))
{ {
char llbuff[22]; char llbuff[22];
my_off_t hexdump_from= print_event_info->hexdump_from; my_off_t hexdump_from= print_event_info->hexdump_from;
DBUG_ENTER("Log_event::print_header");
fputc('#', file); my_b_printf(file, "#");
print_timestamp(file); print_timestamp(file);
fprintf(file, " server id %d end_log_pos %s ", server_id, my_b_printf(file, " server id %d end_log_pos %s ", server_id,
llstr(log_pos,llbuff)); llstr(log_pos,llbuff));
/* mysqlbinlog --hexdump */ /* mysqlbinlog --hexdump */
if (print_event_info->hexdump_from) if (print_event_info->hexdump_from)
{ {
fprintf(file, "\n"); my_b_printf(file, "\n");
uchar *ptr= (uchar*)temp_buf; uchar *ptr= (uchar*)temp_buf;
my_off_t size= my_off_t size=
uint4korr(ptr + EVENT_LEN_OFFSET) - LOG_EVENT_MINIMAL_HEADER_LEN; uint4korr(ptr + EVENT_LEN_OFFSET) - LOG_EVENT_MINIMAL_HEADER_LEN;
...@@ -986,15 +1069,21 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) ...@@ -986,15 +1069,21 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info)
/* Pretty-print event common header if header is exactly 19 bytes */ /* Pretty-print event common header if header is exactly 19 bytes */
if (print_event_info->common_header_len == LOG_EVENT_MINIMAL_HEADER_LEN) if (print_event_info->common_header_len == LOG_EVENT_MINIMAL_HEADER_LEN)
{ {
fprintf(file, "# Position Timestamp Type Master ID " char emit_buf[256]; // Enough for storing one line
my_b_printf(file, "# Position Timestamp Type Master ID "
"Size Master Pos Flags \n"); "Size Master Pos Flags \n");
fprintf(file, "# %8.8lx %02x %02x %02x %02x %02x " int const bytes_written=
my_snprintf(emit_buf, sizeof(emit_buf),
"# %8.8lx %02x %02x %02x %02x %02x "
"%02x %02x %02x %02x %02x %02x %02x %02x " "%02x %02x %02x %02x %02x %02x %02x %02x "
"%02x %02x %02x %02x %02x %02x\n", "%02x %02x %02x %02x %02x %02x\n",
(unsigned long) hexdump_from, (unsigned long) hexdump_from,
ptr[0], ptr[1], ptr[2], ptr[3], ptr[4], ptr[5], ptr[6], ptr[0], ptr[1], ptr[2], ptr[3], ptr[4], ptr[5], ptr[6],
ptr[7], ptr[8], ptr[9], ptr[10], ptr[11], ptr[12], ptr[13], ptr[7], ptr[8], ptr[9], ptr[10], ptr[11], ptr[12], ptr[13],
ptr[14], ptr[15], ptr[16], ptr[17], ptr[18]); ptr[14], ptr[15], ptr[16], ptr[17], ptr[18]);
DBUG_ASSERT(bytes_written >= 0);
DBUG_ASSERT(static_cast<my_size_t>(bytes_written) < sizeof(emit_buf));
my_b_write(file, (byte*) emit_buf, bytes_written);
ptr += LOG_EVENT_MINIMAL_HEADER_LEN; ptr += LOG_EVENT_MINIMAL_HEADER_LEN;
hexdump_from += LOG_EVENT_MINIMAL_HEADER_LEN; hexdump_from += LOG_EVENT_MINIMAL_HEADER_LEN;
} }
...@@ -1011,9 +1100,21 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) ...@@ -1011,9 +1100,21 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info)
if (i % 16 == 15) if (i % 16 == 15)
{ {
fprintf(file, "# %8.8lx %-48.48s |%16s|\n", /*
my_b_printf() does not support full printf() formats, so we
have to do it this way.
TODO: Rewrite my_b_printf() to support full printf() syntax.
*/
char emit_buf[256];
int const bytes_written=
my_snprintf(emit_buf, sizeof(emit_buf),
"# %8.8lx %-48.48s |%16s|\n",
(unsigned long) (hexdump_from + (i & 0xfffffff0)), (unsigned long) (hexdump_from + (i & 0xfffffff0)),
hex_string, char_string); hex_string, char_string);
DBUG_ASSERT(bytes_written >= 0);
DBUG_ASSERT(static_cast<my_size_t>(bytes_written) < sizeof(emit_buf));
my_b_write(file, (byte*) emit_buf, bytes_written);
hex_string[0]= 0; hex_string[0]= 0;
char_string[0]= 0; char_string[0]= 0;
c= char_string; c= char_string;
...@@ -1025,28 +1126,52 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) ...@@ -1025,28 +1126,52 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info)
/* Non-full last line */ /* Non-full last line */
if (hex_string[0]) if (hex_string[0])
fprintf(file, "# %8.8lx %-48.48s |%s|\n# ", {
char emit_buf[256];
int const bytes_written=
my_snprintf(emit_buf, sizeof(emit_buf),
"# %8.8lx %-48.48s |%s|\n# ",
(unsigned long) (hexdump_from + (i & 0xfffffff0)), (unsigned long) (hexdump_from + (i & 0xfffffff0)),
hex_string, char_string); hex_string, char_string);
DBUG_ASSERT(bytes_written >= 0);
DBUG_ASSERT(static_cast<my_size_t>(bytes_written) < sizeof(emit_buf));
my_b_write(file, (byte*) emit_buf, bytes_written);
}
} }
DBUG_VOID_RETURN;
} }
void Log_event::print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info) void Log_event::print_base64(IO_CACHE* file,
PRINT_EVENT_INFO* print_event_info,
bool more)
{ {
uchar *ptr= (uchar*)temp_buf; const uchar *ptr= (const uchar *)temp_buf;
my_off_t size= uint4korr(ptr + EVENT_LEN_OFFSET); my_off_t size= uint4korr(ptr + EVENT_LEN_OFFSET);
char *tmp_str= DBUG_ENTER("Log_event::print_base64");
(char *) my_malloc(base64_needed_encoded_length(size), MYF(MY_WME));
size_t const tmp_str_sz= base64_needed_encoded_length(size);
char *const tmp_str= (char *) my_malloc(tmp_str_sz, MYF(MY_WME));
if (!tmp_str) { if (!tmp_str) {
fprintf(stderr, "\nError: Out of memory. " fprintf(stderr, "\nError: Out of memory. "
"Could not print correct binlog event.\n"); "Could not print correct binlog event.\n");
return; DBUG_VOID_RETURN;
} }
int res= base64_encode(ptr, size, tmp_str);
fprintf(file, "\nBINLOG '\n%s\n';\n", tmp_str); int const res= base64_encode(ptr, size, tmp_str);
DBUG_ASSERT(res == 0);
if (my_b_tell(file) == 0)
my_b_printf(file, "\nBINLOG '\n");
my_b_printf(file, "%s\n", tmp_str);
if (!more)
my_b_printf(file, "';\n");
my_free(tmp_str, MYF(0)); my_free(tmp_str, MYF(0));
DBUG_VOID_RETURN;
} }
...@@ -1054,9 +1179,10 @@ void Log_event::print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info) ...@@ -1054,9 +1179,10 @@ void Log_event::print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info)
Log_event::print_timestamp() Log_event::print_timestamp()
*/ */
void Log_event::print_timestamp(FILE* file, time_t* ts) void Log_event::print_timestamp(IO_CACHE* file, time_t* ts)
{ {
struct tm *res; struct tm *res;
DBUG_ENTER("Log_event::print_timestamp");
if (!ts) if (!ts)
ts = &when; ts = &when;
#ifdef MYSQL_SERVER // This is always false #ifdef MYSQL_SERVER // This is always false
...@@ -1066,13 +1192,14 @@ void Log_event::print_timestamp(FILE* file, time_t* ts) ...@@ -1066,13 +1192,14 @@ void Log_event::print_timestamp(FILE* file, time_t* ts)
res=localtime(ts); res=localtime(ts);
#endif #endif
fprintf(file,"%02d%02d%02d %2d:%02d:%02d", my_b_printf(file,"%02d%02d%02d %2d:%02d:%02d",
res->tm_year % 100, res->tm_year % 100,
res->tm_mon+1, res->tm_mon+1,
res->tm_mday, res->tm_mday,
res->tm_hour, res->tm_hour,
res->tm_min, res->tm_min,
res->tm_sec); res->tm_sec);
DBUG_VOID_RETURN;
} }
#endif /* MYSQL_CLIENT */ #endif /* MYSQL_CLIENT */
...@@ -1544,7 +1671,7 @@ Query_log_event::Query_log_event(const char* buf, uint event_len, ...@@ -1544,7 +1671,7 @@ Query_log_event::Query_log_event(const char* buf, uint event_len,
*/ */
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
void Query_log_event::print_query_header(FILE* file, void Query_log_event::print_query_header(IO_CACHE* file,
PRINT_EVENT_INFO* print_event_info) PRINT_EVENT_INFO* print_event_info)
{ {
// TODO: print the catalog ?? // TODO: print the catalog ??
...@@ -1554,9 +1681,10 @@ void Query_log_event::print_query_header(FILE* file, ...@@ -1554,9 +1681,10 @@ void Query_log_event::print_query_header(FILE* file,
if (!print_event_info->short_form) if (!print_event_info->short_form)
{ {
print_header(file, print_event_info); print_header(file, print_event_info, FALSE);
fprintf(file, "\t%s\tthread_id=%lu\texec_time=%lu\terror_code=%d\n", my_b_printf(file, "\t%s\tthread_id=%lu\texec_time=%lu\terror_code=%d\n",
get_type_str(), (ulong) thread_id, (ulong) exec_time, error_code); get_type_str(), (ulong) thread_id, (ulong) exec_time,
error_code);
} }
if (!(flags & LOG_EVENT_SUPPRESS_USE_F) && db) if (!(flags & LOG_EVENT_SUPPRESS_USE_F) && db)
...@@ -1564,15 +1692,15 @@ void Query_log_event::print_query_header(FILE* file, ...@@ -1564,15 +1692,15 @@ void Query_log_event::print_query_header(FILE* file,
if (different_db= memcmp(print_event_info->db, db, db_len + 1)) if (different_db= memcmp(print_event_info->db, db, db_len + 1))
memcpy(print_event_info->db, db, db_len + 1); memcpy(print_event_info->db, db, db_len + 1);
if (db[0] && different_db) if (db[0] && different_db)
fprintf(file, "use %s;\n", db); my_b_printf(file, "use %s;\n", db);
} }
end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10); end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10);
*end++=';'; *end++=';';
*end++='\n'; *end++='\n';
my_fwrite(file, (byte*) buff, (uint) (end-buff),MYF(MY_NABP | MY_WME)); my_b_write(file, (byte*) buff, (uint) (end-buff));
if (flags & LOG_EVENT_THREAD_SPECIFIC_F) if (flags & LOG_EVENT_THREAD_SPECIFIC_F)
fprintf(file,"SET @@session.pseudo_thread_id=%lu;\n",(ulong)thread_id); my_b_printf(file,"SET @@session.pseudo_thread_id=%lu;\n",(ulong)thread_id);
/* /*
If flags2_inited==0, this is an event from 3.23 or 4.0; nothing to If flags2_inited==0, this is an event from 3.23 or 4.0; nothing to
...@@ -1594,14 +1722,14 @@ void Query_log_event::print_query_header(FILE* file, ...@@ -1594,14 +1722,14 @@ void Query_log_event::print_query_header(FILE* file,
if (unlikely(tmp)) /* some bits have changed */ if (unlikely(tmp)) /* some bits have changed */
{ {
bool need_comma= 0; bool need_comma= 0;
fprintf(file, "SET "); my_b_printf(file, "SET ");
print_set_option(file, tmp, OPTION_NO_FOREIGN_KEY_CHECKS, ~flags2, print_set_option(file, tmp, OPTION_NO_FOREIGN_KEY_CHECKS, ~flags2,
"@@session.foreign_key_checks", &need_comma); "@@session.foreign_key_checks", &need_comma);
print_set_option(file, tmp, OPTION_AUTO_IS_NULL, flags2, print_set_option(file, tmp, OPTION_AUTO_IS_NULL, flags2,
"@@session.sql_auto_is_null", &need_comma); "@@session.sql_auto_is_null", &need_comma);
print_set_option(file, tmp, OPTION_RELAXED_UNIQUE_CHECKS, ~flags2, print_set_option(file, tmp, OPTION_RELAXED_UNIQUE_CHECKS, ~flags2,
"@@session.unique_checks", &need_comma); "@@session.unique_checks", &need_comma);
fprintf(file,";\n"); my_b_printf(file,";\n");
print_event_info->flags2= flags2; print_event_info->flags2= flags2;
} }
} }
...@@ -1629,14 +1757,14 @@ void Query_log_event::print_query_header(FILE* file, ...@@ -1629,14 +1757,14 @@ void Query_log_event::print_query_header(FILE* file,
} }
if (unlikely(print_event_info->sql_mode != sql_mode)) if (unlikely(print_event_info->sql_mode != sql_mode))
{ {
fprintf(file,"SET @@session.sql_mode=%lu;\n",(ulong)sql_mode); my_b_printf(file,"SET @@session.sql_mode=%lu;\n",(ulong)sql_mode);
print_event_info->sql_mode= sql_mode; print_event_info->sql_mode= sql_mode;
} }
} }
if (print_event_info->auto_increment_increment != auto_increment_increment || if (print_event_info->auto_increment_increment != auto_increment_increment ||
print_event_info->auto_increment_offset != auto_increment_offset) print_event_info->auto_increment_offset != auto_increment_offset)
{ {
fprintf(file,"SET @@session.auto_increment_increment=%lu, @@session.auto_increment_offset=%lu;\n", my_b_printf(file,"SET @@session.auto_increment_increment=%lu, @@session.auto_increment_offset=%lu;\n",
auto_increment_increment,auto_increment_offset); auto_increment_increment,auto_increment_offset);
print_event_info->auto_increment_increment= auto_increment_increment; print_event_info->auto_increment_increment= auto_increment_increment;
print_event_info->auto_increment_offset= auto_increment_offset; print_event_info->auto_increment_offset= auto_increment_offset;
...@@ -1656,9 +1784,10 @@ void Query_log_event::print_query_header(FILE* file, ...@@ -1656,9 +1784,10 @@ void Query_log_event::print_query_header(FILE* file,
CHARSET_INFO *cs_info= get_charset(uint2korr(charset), MYF(MY_WME)); CHARSET_INFO *cs_info= get_charset(uint2korr(charset), MYF(MY_WME));
if (cs_info) if (cs_info)
{ {
fprintf(file, "/*!\\C %s */;\n", cs_info->csname); /* for mysql client */ /* for mysql client */
my_b_printf(file, "/*!\\C %s */;\n", cs_info->csname);
} }
fprintf(file,"SET " my_b_printf(file,"SET "
"@@session.character_set_client=%d," "@@session.character_set_client=%d,"
"@@session.collation_connection=%d," "@@session.collation_connection=%d,"
"@@session.collation_server=%d" "@@session.collation_server=%d"
...@@ -1673,7 +1802,7 @@ void Query_log_event::print_query_header(FILE* file, ...@@ -1673,7 +1802,7 @@ void Query_log_event::print_query_header(FILE* file,
{ {
if (bcmp(print_event_info->time_zone_str, time_zone_str, time_zone_len+1)) if (bcmp(print_event_info->time_zone_str, time_zone_str, time_zone_len+1))
{ {
fprintf(file,"SET @@session.time_zone='%s';\n", time_zone_str); my_b_printf(file,"SET @@session.time_zone='%s';\n", time_zone_str);
memcpy(print_event_info->time_zone_str, time_zone_str, time_zone_len+1); memcpy(print_event_info->time_zone_str, time_zone_str, time_zone_len+1);
} }
} }
...@@ -1682,9 +1811,11 @@ void Query_log_event::print_query_header(FILE* file, ...@@ -1682,9 +1811,11 @@ void Query_log_event::print_query_header(FILE* file,
void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
{ {
print_query_header(file, print_event_info); Write_on_release_cache cache(&print_event_info->head_cache, file);
my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME));
fputs(";\n", file); print_query_header(&cache, print_event_info);
my_b_write(&cache, (byte*) query, q_len);
my_b_printf(&cache, ";\n");
} }
#endif /* MYSQL_CLIENT */ #endif /* MYSQL_CLIENT */
...@@ -2014,17 +2145,22 @@ void Start_log_event_v3::pack_info(Protocol *protocol) ...@@ -2014,17 +2145,22 @@ void Start_log_event_v3::pack_info(Protocol *protocol)
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
{ {
DBUG_ENTER("Start_log_event_v3::print");
Write_on_release_cache cache(&print_event_info->head_cache, file,
Write_on_release_cache::FLUSH_F);
if (!print_event_info->short_form) if (!print_event_info->short_form)
{ {
print_header(file, print_event_info); print_header(&cache, print_event_info, FALSE);
fprintf(file, "\tStart: binlog v %d, server v %s created ", binlog_version, my_b_printf(&cache, "\tStart: binlog v %d, server v %s created ",
server_version); binlog_version, server_version);
print_timestamp(file); print_timestamp(&cache);
if (created) if (created)
fprintf(file," at startup"); my_b_printf(&cache," at startup");
fputc('\n', file); my_b_printf(&cache, "\n");
if (flags & LOG_EVENT_BINLOG_IN_USE_F) if (flags & LOG_EVENT_BINLOG_IN_USE_F)
fprintf(file, "# Warning: this binlog was not closed properly. " my_b_printf(&cache, "# Warning: this binlog was not closed properly. "
"Most probably mysqld crashed writing it.\n"); "Most probably mysqld crashed writing it.\n");
} }
if (!artificial_event && created) if (!artificial_event && created)
...@@ -2036,12 +2172,12 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) ...@@ -2036,12 +2172,12 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
and rollback unfinished transaction. and rollback unfinished transaction.
Probably this can be done with RESET CONNECTION (syntax to be defined). Probably this can be done with RESET CONNECTION (syntax to be defined).
*/ */
fprintf(file,"RESET CONNECTION;\n"); my_b_printf(&cache,"RESET CONNECTION;\n");
#else #else
fprintf(file,"ROLLBACK;\n"); my_b_printf(&cache,"ROLLBACK;\n");
#endif #endif
} }
fflush(file); DBUG_VOID_RETURN;
} }
#endif /* MYSQL_CLIENT */ #endif /* MYSQL_CLIENT */
...@@ -2381,19 +2517,19 @@ int Format_description_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -2381,19 +2517,19 @@ int Format_description_log_event::exec_event(struct st_relay_log_info* rli)
if (server_id == (uint32) ::server_id) if (server_id == (uint32) ::server_id)
{ {
/* /*
Do not modify rli->group_master_log_pos, as this event did not exist on We only increase the relay log position if we are skipping
the master. That is, just update the *relay log* coordinates; this is events and do not touch any group_* variables, nor flush the
done by passing log_pos=0 to inc_group_relay_log_pos, like we do in relay log info. If there is a crash, we will have to re-skip
Stop_log_event::exec_event(). the events again, but that is a minor issue.
If in a transaction, don't touch group_* coordinates.
If we do not skip stepping the group log position (and the
server id was changed when restarting the server), it might well
be that we start executing at a position that is invalid, e.g.,
at a Rows_log_event or a Query_log_event preceeded by a
Intvar_log_event instead of starting at a Table_map_log_event or
the Intvar_log_event respectively.
*/ */
if (thd->options & OPTION_BEGIN)
rli->inc_event_relay_log_pos(); rli->inc_event_relay_log_pos();
else
{
rli->inc_group_relay_log_pos(0);
flush_relay_log_info(rli);
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -2765,14 +2901,16 @@ void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) ...@@ -2765,14 +2901,16 @@ void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
} }
void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, void Load_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info,
bool commented) bool commented)
{ {
Write_on_release_cache cache(&print_event_info->head_cache, file_arg);
DBUG_ENTER("Load_log_event::print"); DBUG_ENTER("Load_log_event::print");
if (!print_event_info->short_form) if (!print_event_info->short_form)
{ {
print_header(file, print_event_info); print_header(&cache, print_event_info, FALSE);
fprintf(file, "\tQuery\tthread_id=%ld\texec_time=%ld\n", my_b_printf(&cache, "\tQuery\tthread_id=%ld\texec_time=%ld\n",
thread_id, exec_time); thread_id, exec_time);
} }
...@@ -2791,65 +2929,65 @@ void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, ...@@ -2791,65 +2929,65 @@ void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info,
} }
if (db && db[0] && different_db) if (db && db[0] && different_db)
fprintf(file, "%suse %s;\n", my_b_printf(&cache, "%suse %s;\n",
commented ? "# " : "", commented ? "# " : "",
db); db);
if (flags & LOG_EVENT_THREAD_SPECIFIC_F) if (flags & LOG_EVENT_THREAD_SPECIFIC_F)
fprintf(file,"%sSET @@session.pseudo_thread_id=%lu;\n", my_b_printf(&cache,"%sSET @@session.pseudo_thread_id=%lu;\n",
commented ? "# " : "", (ulong)thread_id); commented ? "# " : "", (ulong)thread_id);
fprintf(file, "%sLOAD DATA ", my_b_printf(&cache, "%sLOAD DATA ",
commented ? "# " : ""); commented ? "# " : "");
if (check_fname_outside_temp_buf()) if (check_fname_outside_temp_buf())
fprintf(file, "LOCAL "); my_b_printf(&cache, "LOCAL ");
fprintf(file, "INFILE '%-*s' ", fname_len, fname); my_b_printf(&cache, "INFILE '%-*s' ", fname_len, fname);
if (sql_ex.opt_flags & REPLACE_FLAG) if (sql_ex.opt_flags & REPLACE_FLAG)
fprintf(file," REPLACE "); my_b_printf(&cache," REPLACE ");
else if (sql_ex.opt_flags & IGNORE_FLAG) else if (sql_ex.opt_flags & IGNORE_FLAG)
fprintf(file," IGNORE "); my_b_printf(&cache," IGNORE ");
fprintf(file, "INTO TABLE `%s`", table_name); my_b_printf(&cache, "INTO TABLE `%s`", table_name);
fprintf(file, " FIELDS TERMINATED BY "); my_b_printf(&cache, " FIELDS TERMINATED BY ");
pretty_print_str(file, sql_ex.field_term, sql_ex.field_term_len); pretty_print_str(&cache, sql_ex.field_term, sql_ex.field_term_len);
if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG) if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
fprintf(file," OPTIONALLY "); my_b_printf(&cache," OPTIONALLY ");
fprintf(file, " ENCLOSED BY "); my_b_printf(&cache, " ENCLOSED BY ");
pretty_print_str(file, sql_ex.enclosed, sql_ex.enclosed_len); pretty_print_str(&cache, sql_ex.enclosed, sql_ex.enclosed_len);
fprintf(file, " ESCAPED BY "); my_b_printf(&cache, " ESCAPED BY ");
pretty_print_str(file, sql_ex.escaped, sql_ex.escaped_len); pretty_print_str(&cache, sql_ex.escaped, sql_ex.escaped_len);
fprintf(file," LINES TERMINATED BY "); my_b_printf(&cache," LINES TERMINATED BY ");
pretty_print_str(file, sql_ex.line_term, sql_ex.line_term_len); pretty_print_str(&cache, sql_ex.line_term, sql_ex.line_term_len);
if (sql_ex.line_start) if (sql_ex.line_start)
{ {
fprintf(file," STARTING BY "); my_b_printf(&cache," STARTING BY ");
pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len); pretty_print_str(&cache, sql_ex.line_start, sql_ex.line_start_len);
} }
if ((long) skip_lines > 0) if ((long) skip_lines > 0)
fprintf(file, " IGNORE %ld LINES", (long) skip_lines); my_b_printf(&cache, " IGNORE %ld LINES", (long) skip_lines);
if (num_fields) if (num_fields)
{ {
uint i; uint i;
const char* field = fields; const char* field = fields;
fprintf(file, " ("); my_b_printf(&cache, " (");
for (i = 0; i < num_fields; i++) for (i = 0; i < num_fields; i++)
{ {
if (i) if (i)
fputc(',', file); my_b_printf(&cache, ",");
fprintf(file, field); my_b_printf(&cache, field);
field += field_lens[i] + 1; field += field_lens[i] + 1;
} }
fputc(')', file); my_b_printf(&cache, ")");
} }
fprintf(file, ";\n"); my_b_printf(&cache, ";\n");
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
#endif /* MYSQL_CLIENT */ #endif /* MYSQL_CLIENT */
...@@ -3185,17 +3323,16 @@ void Rotate_log_event::pack_info(Protocol *protocol) ...@@ -3185,17 +3323,16 @@ void Rotate_log_event::pack_info(Protocol *protocol)
void Rotate_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) void Rotate_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
{ {
char buf[22]; char buf[22];
Write_on_release_cache cache(&print_event_info->head_cache, file,
Write_on_release_cache::FLUSH_F);
if (print_event_info->short_form) if (print_event_info->short_form)
return; return;
print_header(file, print_event_info); print_header(&cache, print_event_info, FALSE);
fprintf(file, "\tRotate to "); my_b_printf(&cache, "\tRotate to ");
if (new_log_ident) if (new_log_ident)
my_fwrite(file, (byte*) new_log_ident, (uint)ident_len, my_b_write(&cache, (byte*) new_log_ident, (uint)ident_len);
MYF(MY_NABP | MY_WME)); my_b_printf(&cache, " pos: %s\n", llstr(pos, buf));
fprintf(file, " pos: %s", llstr(pos, buf));
fputc('\n', file);
fflush(file);
} }
#endif /* MYSQL_CLIENT */ #endif /* MYSQL_CLIENT */
...@@ -3407,14 +3544,16 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) ...@@ -3407,14 +3544,16 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
char llbuff[22]; char llbuff[22];
const char *msg; const char *msg;
LINT_INIT(msg); LINT_INIT(msg);
Write_on_release_cache cache(&print_event_info->head_cache, file,
Write_on_release_cache::FLUSH_F);
if (!print_event_info->short_form) if (!print_event_info->short_form)
{ {
print_header(file, print_event_info); print_header(&cache, print_event_info, FALSE);
fprintf(file, "\tIntvar\n"); my_b_printf(&cache, "\tIntvar\n");
} }
fprintf(file, "SET "); my_b_printf(&cache, "SET ");
switch (type) { switch (type) {
case LAST_INSERT_ID_EVENT: case LAST_INSERT_ID_EVENT:
msg="LAST_INSERT_ID"; msg="LAST_INSERT_ID";
...@@ -3427,8 +3566,7 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) ...@@ -3427,8 +3566,7 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
msg="INVALID_INT"; msg="INVALID_INT";
break; break;
} }
fprintf(file, "%s=%s;\n", msg, llstr(val,llbuff)); my_b_printf(&cache, "%s=%s;\n", msg, llstr(val,llbuff));
fflush(file);
} }
#endif #endif
...@@ -3497,15 +3635,17 @@ bool Rand_log_event::write(IO_CACHE* file) ...@@ -3497,15 +3635,17 @@ bool Rand_log_event::write(IO_CACHE* file)
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
{ {
Write_on_release_cache cache(&print_event_info->head_cache, file,
Write_on_release_cache::FLUSH_F);
char llbuff[22],llbuff2[22]; char llbuff[22],llbuff2[22];
if (!print_event_info->short_form) if (!print_event_info->short_form)
{ {
print_header(file, print_event_info); print_header(&cache, print_event_info, FALSE);
fprintf(file, "\tRand\n"); my_b_printf(&cache, "\tRand\n");
} }
fprintf(file, "SET @@RAND_SEED1=%s, @@RAND_SEED2=%s;\n", my_b_printf(&cache, "SET @@RAND_SEED1=%s, @@RAND_SEED2=%s;\n",
llstr(seed1, llbuff),llstr(seed2, llbuff2)); llstr(seed1, llbuff),llstr(seed2, llbuff2));
fflush(file);
} }
#endif /* MYSQL_CLIENT */ #endif /* MYSQL_CLIENT */
...@@ -3567,16 +3707,18 @@ bool Xid_log_event::write(IO_CACHE* file) ...@@ -3567,16 +3707,18 @@ bool Xid_log_event::write(IO_CACHE* file)
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
{ {
Write_on_release_cache cache(&print_event_info->head_cache, file,
Write_on_release_cache::FLUSH_F);
if (!print_event_info->short_form) if (!print_event_info->short_form)
{ {
char buf[64]; char buf[64];
longlong10_to_str(xid, buf, 10); longlong10_to_str(xid, buf, 10);
print_header(file, print_event_info); print_header(&cache, print_event_info, FALSE);
fprintf(file, "\tXid = %s\n", buf); my_b_printf(&cache, "\tXid = %s\n", buf);
fflush(file);
} }
fprintf(file, "COMMIT;\n"); my_b_printf(&cache, "COMMIT;\n");
} }
#endif /* MYSQL_CLIENT */ #endif /* MYSQL_CLIENT */
...@@ -3766,19 +3908,22 @@ bool User_var_log_event::write(IO_CACHE* file) ...@@ -3766,19 +3908,22 @@ bool User_var_log_event::write(IO_CACHE* file)
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
{ {
Write_on_release_cache cache(&print_event_info->head_cache, file,
Write_on_release_cache::FLUSH_F);
if (!print_event_info->short_form) if (!print_event_info->short_form)
{ {
print_header(file, print_event_info); print_header(&cache, print_event_info, FALSE);
fprintf(file, "\tUser_var\n"); my_b_printf(&cache, "\tUser_var\n");
} }
fprintf(file, "SET @`"); my_b_printf(&cache, "SET @`");
my_fwrite(file, (byte*) name, (uint) (name_len), MYF(MY_NABP | MY_WME)); my_b_write(&cache, (byte*) name, (uint) (name_len));
fprintf(file, "`"); my_b_printf(&cache, "`");
if (is_null) if (is_null)
{ {
fprintf(file, ":=NULL;\n"); my_b_printf(&cache, ":=NULL;\n");
} }
else else
{ {
...@@ -3786,12 +3931,12 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) ...@@ -3786,12 +3931,12 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
case REAL_RESULT: case REAL_RESULT:
double real_val; double real_val;
float8get(real_val, val); float8get(real_val, val);
fprintf(file, ":=%.14g;\n", real_val); my_b_printf(&cache, ":=%.14g;\n", real_val);
break; break;
case INT_RESULT: case INT_RESULT:
char int_buf[22]; char int_buf[22];
longlong10_to_str(uint8korr(val), int_buf, -10); longlong10_to_str(uint8korr(val), int_buf, -10);
fprintf(file, ":=%s;\n", int_buf); my_b_printf(&cache, ":=%s;\n", int_buf);
break; break;
case DECIMAL_RESULT: case DECIMAL_RESULT:
{ {
...@@ -3807,7 +3952,7 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) ...@@ -3807,7 +3952,7 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
bin2decimal(val+2, &dec, precision, scale); bin2decimal(val+2, &dec, precision, scale);
decimal2string(&dec, str_buf, &str_len, 0, 0, 0); decimal2string(&dec, str_buf, &str_len, 0, 0, 0);
str_buf[str_len]= 0; str_buf[str_len]= 0;
fprintf(file, ":=%s;\n",str_buf); my_b_printf(&cache, ":=%s;\n",str_buf);
break; break;
} }
case STRING_RESULT: case STRING_RESULT:
...@@ -3843,9 +3988,9 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) ...@@ -3843,9 +3988,9 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
Generate an unusable command (=> syntax error) is probably the best Generate an unusable command (=> syntax error) is probably the best
thing we can do here. thing we can do here.
*/ */
fprintf(file, ":=???;\n"); my_b_printf(&cache, ":=???;\n");
else else
fprintf(file, ":=_%s %s COLLATE `%s`;\n", cs->csname, hex_str, cs->name); my_b_printf(&cache, ":=_%s %s COLLATE `%s`;\n", cs->csname, hex_str, cs->name);
my_afree(hex_str); my_afree(hex_str);
} }
break; break;
...@@ -3855,7 +4000,6 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) ...@@ -3855,7 +4000,6 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
return; return;
} }
} }
fflush(file);
} }
#endif #endif
...@@ -3939,13 +4083,14 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -3939,13 +4083,14 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli)
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
void Unknown_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) void Unknown_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info)
{ {
Write_on_release_cache cache(&print_event_info->head_cache, file_arg);
if (print_event_info->short_form) if (print_event_info->short_form)
return; return;
print_header(file, print_event_info); print_header(&cache, print_event_info, FALSE);
fputc('\n', file); my_b_printf(&cache, "\n# %s", "Unknown event\n");
fprintf(file, "# %s", "Unknown event\n");
} }
#endif #endif
...@@ -4012,12 +4157,13 @@ Slave_log_event::~Slave_log_event() ...@@ -4012,12 +4157,13 @@ Slave_log_event::~Slave_log_event()
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
void Slave_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) void Slave_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
{ {
Write_on_release_cache cache(&print_event_info->head_cache, file);
char llbuff[22]; char llbuff[22];
if (print_event_info->short_form) if (print_event_info->short_form)
return; return;
print_header(file, print_event_info); print_header(&cache, print_event_info, FALSE);
fputc('\n', file); my_b_printf(&cache, "\n\
fprintf(file, "\
Slave: master_host: '%s' master_port: %d master_log: '%s' master_pos: %s\n", Slave: master_host: '%s' master_port: %d master_log: '%s' master_pos: %s\n",
master_host, master_port, master_log, llstr(master_pos, llbuff)); master_host, master_port, master_log, llstr(master_pos, llbuff));
} }
...@@ -4097,12 +4243,14 @@ int Slave_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -4097,12 +4243,14 @@ int Slave_log_event::exec_event(struct st_relay_log_info* rli)
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
{ {
Write_on_release_cache cache(&print_event_info->head_cache, file,
Write_on_release_cache::FLUSH_F);
if (print_event_info->short_form) if (print_event_info->short_form)
return; return;
print_header(file, print_event_info); print_header(&cache, print_event_info, FALSE);
fprintf(file, "\tStop\n"); my_b_printf(&cache, "\tStop\n");
fflush(file);
} }
#endif /* MYSQL_CLIENT */ #endif /* MYSQL_CLIENT */
...@@ -4277,6 +4425,8 @@ Create_file_log_event::Create_file_log_event(const char* buf, uint len, ...@@ -4277,6 +4425,8 @@ Create_file_log_event::Create_file_log_event(const char* buf, uint len,
void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info,
bool enable_local) bool enable_local)
{ {
Write_on_release_cache cache(&print_event_info->head_cache, file);
if (print_event_info->short_form) if (print_event_info->short_form)
{ {
if (enable_local && check_fname_outside_temp_buf()) if (enable_local && check_fname_outside_temp_buf())
...@@ -4292,10 +4442,10 @@ void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info ...@@ -4292,10 +4442,10 @@ void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info
That one is for "file_id: etc" below: in mysqlbinlog we want the #, in That one is for "file_id: etc" below: in mysqlbinlog we want the #, in
SHOW BINLOG EVENTS we don't. SHOW BINLOG EVENTS we don't.
*/ */
fprintf(file, "#"); my_b_printf(&cache, "#");
} }
fprintf(file, " file_id: %d block_len: %d\n", file_id, block_len); my_b_printf(&cache, " file_id: %d block_len: %d\n", file_id, block_len);
} }
...@@ -4465,11 +4615,12 @@ bool Append_block_log_event::write(IO_CACHE* file) ...@@ -4465,11 +4615,12 @@ bool Append_block_log_event::write(IO_CACHE* file)
void Append_block_log_event::print(FILE* file, void Append_block_log_event::print(FILE* file,
PRINT_EVENT_INFO* print_event_info) PRINT_EVENT_INFO* print_event_info)
{ {
Write_on_release_cache cache(&print_event_info->head_cache, file);
if (print_event_info->short_form) if (print_event_info->short_form)
return; return;
print_header(file, print_event_info); print_header(&cache, print_event_info, FALSE);
fputc('\n', file); my_b_printf(&cache, "\n#%s: file_id: %d block_len: %d\n",
fprintf(file, "#%s: file_id: %d block_len: %d\n",
get_type_str(), file_id, block_len); get_type_str(), file_id, block_len);
} }
#endif /* MYSQL_CLIENT */ #endif /* MYSQL_CLIENT */
...@@ -4608,11 +4759,12 @@ bool Delete_file_log_event::write(IO_CACHE* file) ...@@ -4608,11 +4759,12 @@ bool Delete_file_log_event::write(IO_CACHE* file)
void Delete_file_log_event::print(FILE* file, void Delete_file_log_event::print(FILE* file,
PRINT_EVENT_INFO* print_event_info) PRINT_EVENT_INFO* print_event_info)
{ {
Write_on_release_cache cache(&print_event_info->head_cache, file);
if (print_event_info->short_form) if (print_event_info->short_form)
return; return;
print_header(file, print_event_info); print_header(&cache, print_event_info, FALSE);
fputc('\n', file); my_b_printf(&cache, "\n#Delete_file: file_id=%u\n", file_id);
fprintf(file, "#Delete_file: file_id=%u\n", file_id);
} }
#endif /* MYSQL_CLIENT */ #endif /* MYSQL_CLIENT */
...@@ -4703,11 +4855,12 @@ bool Execute_load_log_event::write(IO_CACHE* file) ...@@ -4703,11 +4855,12 @@ bool Execute_load_log_event::write(IO_CACHE* file)
void Execute_load_log_event::print(FILE* file, void Execute_load_log_event::print(FILE* file,
PRINT_EVENT_INFO* print_event_info) PRINT_EVENT_INFO* print_event_info)
{ {
Write_on_release_cache cache(&print_event_info->head_cache, file);
if (print_event_info->short_form) if (print_event_info->short_form)
return; return;
print_header(file, print_event_info); print_header(&cache, print_event_info, FALSE);
fputc('\n', file); my_b_printf(&cache, "\n#Exec_load: file_id=%d\n",
fprintf(file, "#Exec_load: file_id=%d\n",
file_id); file_id);
} }
#endif #endif
...@@ -4925,29 +5078,30 @@ void Execute_load_query_log_event::print(FILE* file, ...@@ -4925,29 +5078,30 @@ void Execute_load_query_log_event::print(FILE* file,
PRINT_EVENT_INFO* print_event_info, PRINT_EVENT_INFO* print_event_info,
const char *local_fname) const char *local_fname)
{ {
print_query_header(file, print_event_info); Write_on_release_cache cache(&print_event_info->head_cache, file);
print_query_header(&cache, print_event_info);
if (local_fname) if (local_fname)
{ {
my_fwrite(file, (byte*) query, fn_pos_start, MYF(MY_NABP | MY_WME)); my_b_write(&cache, (byte*) query, fn_pos_start);
fprintf(file, " LOCAL INFILE \'"); my_b_printf(&cache, " LOCAL INFILE \'");
fprintf(file, local_fname); my_b_printf(&cache, local_fname);
fprintf(file, "\'"); my_b_printf(&cache, "\'");
if (dup_handling == LOAD_DUP_REPLACE) if (dup_handling == LOAD_DUP_REPLACE)
fprintf(file, " REPLACE"); my_b_printf(&cache, " REPLACE");
fprintf(file, " INTO"); my_b_printf(&cache, " INTO");
my_fwrite(file, (byte*) query + fn_pos_end, q_len-fn_pos_end, my_b_write(&cache, (byte*) query + fn_pos_end, q_len-fn_pos_end);
MYF(MY_NABP | MY_WME)); my_b_printf(&cache, ";\n");
fprintf(file, ";\n");
} }
else else
{ {
my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME)); my_b_write(&cache, (byte*) query, q_len);
fprintf(file, ";\n"); my_b_printf(&cache, ";\n");
} }
if (!print_event_info->short_form) if (!print_event_info->short_form)
fprintf(file, "# file_id: %d \n", file_id); my_b_printf(&cache, "# file_id: %d \n", file_id);
} }
#endif #endif
...@@ -5797,6 +5951,31 @@ void Rows_log_event::pack_info(Protocol *protocol) ...@@ -5797,6 +5951,31 @@ void Rows_log_event::pack_info(Protocol *protocol)
} }
#endif #endif
#ifdef MYSQL_CLIENT
void Rows_log_event::print_helper(FILE *file,
PRINT_EVENT_INFO *print_event_info,
char const *const name)
{
IO_CACHE *const head= &print_event_info->head_cache;
IO_CACHE *const body= &print_event_info->body_cache;
if (!print_event_info->short_form)
{
bool const last_stmt_event= get_flags(STMT_END_F);
print_header(head, print_event_info, !last_stmt_event);
my_b_printf(head, "\t%s: table id %lu", name, m_table_id);
print_base64(body, print_event_info, !last_stmt_event);
}
if (get_flags(STMT_END_F))
{
my_b_copy_to_file(head, file);
my_b_copy_to_file(body, file);
reinit_io_cache(head, WRITE_CACHE, 0, FALSE, TRUE);
reinit_io_cache(body, WRITE_CACHE, 0, FALSE, TRUE);
}
}
#endif
/************************************************************************** /**************************************************************************
Table_map_log_event member functions and support functions Table_map_log_event member functions and support functions
**************************************************************************/ **************************************************************************/
...@@ -6148,10 +6327,11 @@ void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) ...@@ -6148,10 +6327,11 @@ void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
{ {
if (!print_event_info->short_form) if (!print_event_info->short_form)
{ {
print_header(file, print_event_info); print_header(&print_event_info->head_cache, print_event_info, TRUE);
fprintf(file, "\tTable_map: `%s`.`%s` mapped to number %lu\n", my_b_printf(&print_event_info->head_cache,
"\tTable_map: `%s`.`%s` mapped to number %lu\n",
m_dbnam, m_tblnam, m_table_id); m_dbnam, m_tblnam, m_table_id);
print_base64(file, print_event_info); print_base64(&print_event_info->body_cache, print_event_info, TRUE);
} }
} }
#endif #endif
...@@ -6513,12 +6693,7 @@ int Write_rows_log_event::do_exec_row(TABLE *table) ...@@ -6513,12 +6693,7 @@ int Write_rows_log_event::do_exec_row(TABLE *table)
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
{ {
if (!print_event_info->short_form) Rows_log_event::print_helper(file, print_event_info, "Write_rows");
{
print_header(file, print_event_info);
fprintf(file, "\tWrite_rows: table id %lu", m_table_id);
print_base64(file, print_event_info);
}
} }
#endif #endif
...@@ -6846,12 +7021,7 @@ int Delete_rows_log_event::do_exec_row(TABLE *table) ...@@ -6846,12 +7021,7 @@ int Delete_rows_log_event::do_exec_row(TABLE *table)
void Delete_rows_log_event::print(FILE *file, void Delete_rows_log_event::print(FILE *file,
PRINT_EVENT_INFO* print_event_info) PRINT_EVENT_INFO* print_event_info)
{ {
if (!print_event_info->short_form) Rows_log_event::print_helper(file, print_event_info, "Delete_rows");
{
print_header(file, print_event_info);
fprintf(file, "\tDelete_rows: table id %lu", m_table_id);
print_base64(file, print_event_info);
}
} }
#endif #endif
...@@ -7017,12 +7187,7 @@ int Update_rows_log_event::do_exec_row(TABLE *table) ...@@ -7017,12 +7187,7 @@ int Update_rows_log_event::do_exec_row(TABLE *table)
void Update_rows_log_event::print(FILE *file, void Update_rows_log_event::print(FILE *file,
PRINT_EVENT_INFO* print_event_info) PRINT_EVENT_INFO* print_event_info)
{ {
if (!print_event_info->short_form) Rows_log_event::print_helper(file, print_event_info, "Update_rows");
{
print_header(file, print_event_info);
fprintf(file, "\tUpdate_rows: table id %lu", m_table_id);
print_base64(file, print_event_info);
}
} }
#endif #endif
......
...@@ -519,14 +519,30 @@ typedef struct st_print_event_info ...@@ -519,14 +519,30 @@ typedef struct st_print_event_info
bzero(db, sizeof(db)); bzero(db, sizeof(db));
bzero(charset, sizeof(charset)); bzero(charset, sizeof(charset));
bzero(time_zone_str, sizeof(time_zone_str)); bzero(time_zone_str, sizeof(time_zone_str));
uint const flags = MYF(MY_WME | MY_NABP);
init_io_cache(&head_cache, -1, 0, WRITE_CACHE, 0L, FALSE, flags);
init_io_cache(&body_cache, -1, 0, WRITE_CACHE, 0L, FALSE, flags);
} }
~st_print_event_info() {
end_io_cache(&head_cache);
end_io_cache(&body_cache);
}
/* Settings on how to print the events */ /* Settings on how to print the events */
bool short_form; bool short_form;
bool base64_output; bool base64_output;
my_off_t hexdump_from; my_off_t hexdump_from;
uint8 common_header_len; uint8 common_header_len;
/*
These two caches are used by the row-based replication events to
collect the header information and the main body of the events
making up a statement.
*/
IO_CACHE head_cache;
IO_CACHE body_cache;
} PRINT_EVENT_INFO; } PRINT_EVENT_INFO;
#endif #endif
...@@ -637,9 +653,11 @@ public: ...@@ -637,9 +653,11 @@ public:
const Format_description_log_event *description_event); const Format_description_log_event *description_event);
/* print*() functions are used by mysqlbinlog */ /* print*() functions are used by mysqlbinlog */
virtual void print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0; virtual void print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0;
void print_timestamp(FILE* file, time_t *ts = 0); void print_timestamp(IO_CACHE* file, time_t *ts = 0);
void print_header(FILE* file, PRINT_EVENT_INFO* print_event_info); void print_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info,
void print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info); bool is_more);
void print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info,
bool is_more);
#endif #endif
static void *operator new(size_t size) static void *operator new(size_t size)
...@@ -804,7 +822,7 @@ public: ...@@ -804,7 +822,7 @@ public:
uint32 q_len_arg); uint32 q_len_arg);
#endif /* HAVE_REPLICATION */ #endif /* HAVE_REPLICATION */
#else #else
void print_query_header(FILE* file, PRINT_EVENT_INFO* print_event_info); void print_query_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info);
void print(FILE* file, PRINT_EVENT_INFO* print_event_info); void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
#endif #endif
...@@ -1864,6 +1882,10 @@ protected: ...@@ -1864,6 +1882,10 @@ protected:
Log_event_type event_type, Log_event_type event_type,
const Format_description_log_event *description_event); const Format_description_log_event *description_event);
#ifdef MYSQL_CLIENT
void print_helper(FILE *, PRINT_EVENT_INFO *, char const *const name);
#endif
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
virtual int do_add_row_data(byte *data, my_size_t length); virtual int do_add_row_data(byte *data, my_size_t length);
#endif #endif
......
...@@ -5968,6 +5968,9 @@ ER_CANT_ACTIVATE_LOG ...@@ -5968,6 +5968,9 @@ ER_CANT_ACTIVATE_LOG
ger "Kann Logdatei '%-.64s' nicht aktivieren" ger "Kann Logdatei '%-.64s' nicht aktivieren"
ER_RBR_NOT_AVAILABLE ER_RBR_NOT_AVAILABLE
eng "The server was not built with row-based replication" eng "The server was not built with row-based replication"
ER_BASE64_DECODE_ERROR
eng "Decoding of base64 string failed"
swe "Avkodning av base64 strng misslyckades"
ger "Der Server hat keine zeilenbasierte Replikation" ger "Der Server hat keine zeilenbasierte Replikation"
ER_NO_TRIGGERS_ON_SYSTEM_SCHEMA ER_NO_TRIGGERS_ON_SYSTEM_SCHEMA
eng "Triggers can not be created on system tables" eng "Triggers can not be created on system tables"
......
...@@ -3101,17 +3101,22 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) ...@@ -3101,17 +3101,22 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
type_code != START_EVENT_V3 && type_code!= FORMAT_DESCRIPTION_EVENT)) type_code != START_EVENT_V3 && type_code!= FORMAT_DESCRIPTION_EVENT))
{ {
DBUG_PRINT("info", ("event skipped")); DBUG_PRINT("info", ("event skipped"));
if (thd->options & OPTION_BEGIN) /*
We only skip the event here and do not increase the group log
position. In the event that we have to restart, this means
that we might have to skip the event again, but that is a
minor issue.
If we were to increase the group log position when skipping an
event, it might be that we are restarting at the wrong
position and have events before that we should have executed,
so not increasing the group log position is a sure bet in this
case.
In this way, we just step the group log position when we
*know* that we are at the end of a group.
*/
rli->inc_event_relay_log_pos(); rli->inc_event_relay_log_pos();
else
{
rli->inc_group_relay_log_pos((type_code == ROTATE_EVENT ||
type_code == STOP_EVENT ||
type_code == FORMAT_DESCRIPTION_EVENT) ?
LL(0) : ev->log_pos,
1/* skip lock*/);
flush_relay_log_info(rli);
}
/* /*
Protect against common user error of setting the counter to 1 Protect against common user error of setting the counter to 1
......
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
void mysql_client_binlog_statement(THD* thd) void mysql_client_binlog_statement(THD* thd)
{ {
DBUG_ENTER("mysql_client_binlog_statement");
DBUG_PRINT("info",("binlog base64: '%*s'", DBUG_PRINT("info",("binlog base64: '%*s'",
(thd->lex->comment.length < 2048 ? (thd->lex->comment.length < 2048 ?
thd->lex->comment.length : 2048), thd->lex->comment.length : 2048),
...@@ -43,8 +44,8 @@ void mysql_client_binlog_statement(THD* thd) ...@@ -43,8 +44,8 @@ void mysql_client_binlog_statement(THD* thd)
my_bool nsok= thd->net.no_send_ok; my_bool nsok= thd->net.no_send_ok;
thd->net.no_send_ok= TRUE; thd->net.no_send_ok= TRUE;
const my_size_t coded_len= thd->lex->comment.length + 1; my_size_t coded_len= thd->lex->comment.length + 1;
const my_size_t event_len= base64_needed_decoded_length(coded_len); my_size_t decoded_len= base64_needed_decoded_length(coded_len);
DBUG_ASSERT(coded_len > 0); DBUG_ASSERT(coded_len > 0);
/* /*
...@@ -57,9 +58,8 @@ void mysql_client_binlog_statement(THD* thd) ...@@ -57,9 +58,8 @@ void mysql_client_binlog_statement(THD* thd)
new Format_description_log_event(4); new Format_description_log_event(4);
const char *error= 0; const char *error= 0;
char *buf= (char *) my_malloc(event_len, MYF(MY_WME)); char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME));
Log_event *ev = 0; Log_event *ev = 0;
int res;
/* /*
Out of memory check Out of memory check
...@@ -73,45 +73,99 @@ void mysql_client_binlog_statement(THD* thd) ...@@ -73,45 +73,99 @@ void mysql_client_binlog_statement(THD* thd)
thd->rli_fake->sql_thd= thd; thd->rli_fake->sql_thd= thd;
thd->rli_fake->no_storage= TRUE; thd->rli_fake->no_storage= TRUE;
res= base64_decode(thd->lex->comment.str, coded_len, buf); for (char const *strptr= thd->lex->comment.str ;
strptr < thd->lex->comment.str + thd->lex->comment.length ; )
{
char const *endptr= 0;
int bytes_decoded= base64_decode(strptr, coded_len, buf, &endptr);
DBUG_PRINT("info",
("bytes_decoded=%d; strptr=0x%lu; endptr=0x%lu ('%c':%d)",
bytes_decoded, strptr, endptr, *endptr, *endptr));
if (bytes_decoded < 0)
{
my_error(ER_BASE64_DECODE_ERROR, MYF(0));
goto end;
}
else if (bytes_decoded == 0)
break; // If no bytes where read, the string contained only whitespace
DBUG_ASSERT(bytes_decoded > 0);
DBUG_ASSERT(endptr > strptr);
coded_len-= endptr - strptr;
strptr= endptr;
/*
Now we have one or more events stored in the buffer. The size of
the buffer is computed based on how much base64-encoded data
there were, so there should be ample space for the data (maybe
even too much, since a statement can consist of a considerable
number of events).
TODO: Switch to use a stream-based base64 encoder/decoder in
order to be able to read exactly what is necessary.
*/
DBUG_PRINT("info",("binlog base64 decoded_len=%d, bytes_decoded=%d",
decoded_len, bytes_decoded));
DBUG_PRINT("info",("binlog base64 decoded_len=%d, event_len=%d\n",
res, uint4korr(buf + EVENT_LEN_OFFSET)));
/* /*
Note that 'res' is the correct event length, 'event_len' was Now we start to read events of the buffer, until there are no
calculated based on the base64-string that possibly contained more.
extra spaces, so it can be longer than the real event.
*/ */
if (res < EVENT_LEN_OFFSET for (char *bufptr= buf ; bytes_decoded > 0 ; )
|| (uint) res != uint4korr(buf+EVENT_LEN_OFFSET)) {
/*
Checking that the first event in the buffer is not truncated.
*/
ulong event_len= uint4korr(bufptr + EVENT_LEN_OFFSET);
DBUG_PRINT("info", ("event_len=%lu, bytes_decoded=%d",
event_len, bytes_decoded));
if (bytes_decoded < EVENT_LEN_OFFSET || (uint) bytes_decoded < event_len)
{ {
my_error(ER_SYNTAX_ERROR, MYF(0)); my_error(ER_SYNTAX_ERROR, MYF(0));
goto end; goto end;
} }
ev= Log_event::read_log_event(buf, res, &error, desc); ev= Log_event::read_log_event(bufptr, event_len, &error, desc);
DBUG_PRINT("info",("binlog base64 err=%s", error)); DBUG_PRINT("info",("binlog base64 err=%s", error));
if (!ev) if (!ev)
{ {
/* /*
This could actually be an out-of-memory, but it is more This could actually be an out-of-memory, but it is more likely
likely causes by a bad statement causes by a bad statement
*/ */
my_error(ER_SYNTAX_ERROR, MYF(0)); my_error(ER_SYNTAX_ERROR, MYF(0));
goto end; goto end;
} }
DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code())); bytes_decoded -= event_len;
DBUG_PRINT("info",("buf+EVENT_TYPE_OFFSET=%d", buf+EVENT_TYPE_OFFSET)); bufptr += event_len;
DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code()));
DBUG_PRINT("info",("bufptr+EVENT_TYPE_OFFSET=0x%lx",
bufptr+EVENT_TYPE_OFFSET));
DBUG_PRINT("info", ("bytes_decoded=%d; bufptr=0x%lx; buf[EVENT_LEN_OFFSET]=%u",
bytes_decoded, bufptr, uint4korr(bufptr+EVENT_LEN_OFFSET)));
ev->thd= thd; ev->thd= thd;
if (ev->exec_event(thd->rli_fake)) if (int err= ev->exec_event(thd->rli_fake))
{ {
DBUG_PRINT("info", ("exec_event() - error=%d", error));
/*
TODO: Maybe a better error message since the BINLOG statement
now contains several events.
*/
my_error(ER_UNKNOWN_ERROR, MYF(0), "Error executing BINLOG statement"); my_error(ER_UNKNOWN_ERROR, MYF(0), "Error executing BINLOG statement");
goto end; goto end;
} }
delete ev;
ev= 0;
}
}
/* /*
Restore setting of no_send_ok Restore setting of no_send_ok
*/ */
...@@ -126,10 +180,7 @@ end: ...@@ -126,10 +180,7 @@ end:
*/ */
thd->net.no_send_ok= nsok; thd->net.no_send_ok= nsok;
if (ev)
delete ev;
if (desc)
delete desc; delete desc;
if (buf) my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
my_free(buf, MYF(0)); DBUG_VOID_RETURN;
} }
...@@ -930,6 +930,7 @@ public: ...@@ -930,6 +930,7 @@ public:
/* /*
Public interface to write RBR events to the binlog Public interface to write RBR events to the binlog
*/ */
void binlog_start_trans_and_stmt();
int binlog_write_table_map(TABLE *table, bool is_transactional); int binlog_write_table_map(TABLE *table, bool is_transactional);
int binlog_write_row(TABLE* table, bool is_transactional, int binlog_write_row(TABLE* table, bool is_transactional,
MY_BITMAP const* cols, my_size_t colcnt, MY_BITMAP const* cols, my_size_t colcnt,
......
...@@ -2033,6 +2033,10 @@ err: ...@@ -2033,6 +2033,10 @@ err:
rolled back. We only need to roll back a potential statement rolled back. We only need to roll back a potential statement
transaction, since real transactions are rolled back in transaction, since real transactions are rolled back in
close_thread_tables(). close_thread_tables().
TODO: This is not true any more, table maps are generated on the
first call to ha_*_row() instead. Remove code that are used to
cover for the case outlined above.
*/ */
ha_rollback_stmt(thd); ha_rollback_stmt(thd);
...@@ -2402,6 +2406,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) ...@@ -2402,6 +2406,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
DBUG_ENTER("select_insert::prepare"); DBUG_ENTER("select_insert::prepare");
unit= u; unit= u;
/* /*
Since table in which we are going to insert is added to the first Since table in which we are going to insert is added to the first
select, LEX::current_select should point to the first select while select, LEX::current_select should point to the first select while
...@@ -2631,20 +2636,26 @@ void select_insert::send_error(uint errcode,const char *err) ...@@ -2631,20 +2636,26 @@ void select_insert::send_error(uint errcode,const char *err)
if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error) if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error)
my_message(errcode, err, MYF(0)); my_message(errcode, err, MYF(0));
if (!table) /*
If the creation of the table failed (due to a syntax error, for
example), no table will have been opened and therefore 'table'
will be NULL. In that case, we still need to execute the rollback
and the end of the function to truncate the binary log, but we can
skip all the intermediate steps.
*/
if (table)
{ {
/* /*
This can only happen when using CREATE ... SELECT and the table was not If we are not in prelocked mode, we end the bulk insert started
created becasue of an syntax error before.
*/ */
DBUG_VOID_RETURN;
}
if (!thd->prelocked_mode) if (!thd->prelocked_mode)
table->file->ha_end_bulk_insert(); table->file->ha_end_bulk_insert();
/* /*
If at least one row has been inserted/modified and will stay in the table If at least one row has been inserted/modified and will stay in
(the table doesn't have transactions) we must write to the binlog (and the table (the table doesn't have transactions) we must write to
the error code will make the slave stop). the binlog (and the error code will make the slave stop).
For many errors (example: we got a duplicate key error while For many errors (example: we got a duplicate key error while
inserting into a MyISAM table), no row will be added to the table, inserting into a MyISAM table), no row will be added to the table,
...@@ -2652,22 +2663,12 @@ void select_insert::send_error(uint errcode,const char *err) ...@@ -2652,22 +2663,12 @@ void select_insert::send_error(uint errcode,const char *err)
be an error code mismatch (the inserts will succeed on the slave be an error code mismatch (the inserts will succeed on the slave
with no error). with no error).
If we are using row-based replication we have two cases where this If table creation failed, the number of rows modified will also be
code is executed: replication of CREATE-SELECT and replication of zero, so no check for that is made.
INSERT-SELECT.
When replicating a CREATE-SELECT statement, we shall not write the
events to the binary log and should thus not set
OPTION_STATUS_NO_TRANS_UPDATE.
When replicating INSERT-SELECT, we shall not write the events to
the binary log for transactional table, but shall write all events
if there is one or more writes to non-transactional tables. In
this case, the OPTION_STATUS_NO_TRANS_UPDATE is set if there is a
write to a non-transactional table, otherwise it is cleared.
*/ */
if (info.copied || info.deleted || info.updated) if (info.copied || info.deleted || info.updated)
{ {
DBUG_ASSERT(table != NULL);
if (!table->file->has_transactions()) if (!table->file->has_transactions())
{ {
if (mysql_bin_log.is_open()) if (mysql_bin_log.is_open())
...@@ -2681,8 +2682,10 @@ void select_insert::send_error(uint errcode,const char *err) ...@@ -2681,8 +2682,10 @@ void select_insert::send_error(uint errcode,const char *err)
query_cache_invalidate3(thd, table, 1); query_cache_invalidate3(thd, table, 1);
} }
} }
ha_rollback_stmt(thd);
table->file->ha_release_auto_increment(); table->file->ha_release_auto_increment();
}
ha_rollback_stmt(thd);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -2690,8 +2693,11 @@ void select_insert::send_error(uint errcode,const char *err) ...@@ -2690,8 +2693,11 @@ void select_insert::send_error(uint errcode,const char *err)
bool select_insert::send_eof() bool select_insert::send_eof()
{ {
int error,error2; int error,error2;
bool const trans_table= table->file->has_transactions();
ulonglong id; ulonglong id;
DBUG_ENTER("select_insert::send_eof"); DBUG_ENTER("select_insert::send_eof");
DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
trans_table, table->file->table_type()));
error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0; error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0;
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
...@@ -2711,9 +2717,8 @@ bool select_insert::send_eof() ...@@ -2711,9 +2717,8 @@ bool select_insert::send_eof()
are not logged in RBR) are not logged in RBR)
- We are using statement based replication - We are using statement based replication
*/ */
if (!table->file->has_transactions() && if (!trans_table &&
(!table->s->tmp_table || (!table->s->tmp_table || !thd->current_stmt_binlog_row_based))
!thd->current_stmt_binlog_row_based))
thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
} }
...@@ -2729,11 +2734,22 @@ bool select_insert::send_eof() ...@@ -2729,11 +2734,22 @@ bool select_insert::send_eof()
thd->clear_error(); thd->clear_error();
thd->binlog_query(THD::ROW_QUERY_TYPE, thd->binlog_query(THD::ROW_QUERY_TYPE,
thd->query, thd->query_length, thd->query, thd->query_length,
table->file->has_transactions(), FALSE); trans_table, FALSE);
} }
if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error) /*
We will call ha_autocommit_or_rollback() also for
non-transactional tables under row-based replication: there might
be events in the binary logs transaction, and we need to write
them to the binary log.
*/
if (trans_table || thd->current_stmt_binlog_row_based)
{
int const error2= ha_autocommit_or_rollback(thd, error);
if (error2 && !error)
error=error2; error=error2;
}
table->file->ha_release_auto_increment(); table->file->ha_release_auto_increment();
if (error) if (error)
{ {
table->file->print_error(error,MYF(0)); table->file->print_error(error,MYF(0));
...@@ -2930,14 +2946,19 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) ...@@ -2930,14 +2946,19 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
class MY_HOOKS : public TABLEOP_HOOKS { class MY_HOOKS : public TABLEOP_HOOKS {
public: public:
MY_HOOKS(select_create *x) : ptr(x) { } MY_HOOKS(select_create *x) : ptr(x) { }
private:
virtual void do_prelock(TABLE **tables, uint count) virtual void do_prelock(TABLE **tables, uint count)
{ {
TABLE const *const table = *tables;
if (ptr->get_thd()->current_stmt_binlog_row_based && if (ptr->get_thd()->current_stmt_binlog_row_based &&
!(ptr->get_create_info()->options & HA_LEX_CREATE_TMP_TABLE)) table->s->tmp_table == NO_TMP_TABLE &&
!ptr->get_create_info()->table_existed)
{
ptr->binlog_show_create_table(tables, count); ptr->binlog_show_create_table(tables, count);
} }
}
private:
select_create *ptr; select_create *ptr;
}; };
...@@ -2946,6 +2967,20 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) ...@@ -2946,6 +2967,20 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
#endif #endif
unit= u; unit= u;
#ifdef HAVE_ROW_BASED_REPLICATION
/*
Start a statement transaction before the create if we are creating
a non-temporary table and are using row-based replication for the
statement.
*/
if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
thd->current_stmt_binlog_row_based)
{
thd->binlog_start_trans_and_stmt();
}
#endif
if (!(table= create_table_from_items(thd, create_info, create_table, if (!(table= create_table_from_items(thd, create_info, create_table,
extra_fields, keys, &values, extra_fields, keys, &values,
&thd->extra_lock, hook_ptr))) &thd->extra_lock, hook_ptr)))
...@@ -3093,8 +3128,17 @@ void select_create::abort() ...@@ -3093,8 +3128,17 @@ void select_create::abort()
table->s->version= 0; table->s->version= 0;
hash_delete(&open_cache,(byte*) table); hash_delete(&open_cache,(byte*) table);
if (!create_info->table_existed) if (!create_info->table_existed)
{
quick_rm_table(table_type, create_table->db, quick_rm_table(table_type, create_table->db,
create_table->table_name, 0); create_table->table_name, 0);
/*
We roll back the statement, including truncating the
transaction cache of the binary log, if the statement
failed.
*/
if (thd->current_stmt_binlog_row_based)
ha_rollback_stmt(thd);
}
/* Tell threads waiting for refresh that something has happened */ /* Tell threads waiting for refresh that something has happened */
if (version != refresh_version) if (version != refresh_version)
broadcast_refresh(); broadcast_refresh();
......
...@@ -2013,7 +2013,7 @@ ndb_mgm_get_configuration(NdbMgmHandle handle, unsigned int version) { ...@@ -2013,7 +2013,7 @@ ndb_mgm_get_configuration(NdbMgmHandle handle, unsigned int version) {
break; break;
void *tmp_data = malloc(base64_needed_decoded_length((size_t) (len - 1))); void *tmp_data = malloc(base64_needed_decoded_length((size_t) (len - 1)));
const int res = base64_decode(buf64, len-1, tmp_data); const int res = base64_decode(buf64, len-1, tmp_data, NULL);
delete[] buf64; delete[] buf64;
UtilBuffer tmp; UtilBuffer tmp;
tmp.append((void *) tmp_data, res); tmp.append((void *) tmp_data, res);
......
...@@ -54,7 +54,7 @@ main(void) ...@@ -54,7 +54,7 @@ main(void)
/* Decode */ /* Decode */
dst= (char *) malloc(base64_needed_decoded_length(strlen(str))); dst= (char *) malloc(base64_needed_decoded_length(strlen(str)));
dst_len= base64_decode(str, strlen(str), dst); dst_len= base64_decode(str, strlen(str), dst, NULL);
ok(dst_len == src_len, "Comparing lengths"); ok(dst_len == src_len, "Comparing lengths");
cmp= memcmp(src, dst, src_len); cmp= memcmp(src, dst, src_len);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment