Commit 41d68cab authored by Sergei Golubchik's avatar Sergei Golubchik

cleanup: Log_event::write() and MYSQL_BIN_LOG::write_cache()

Introduce Log_event_writer() that encapsulates
writing data to an IO_CACHE with automatic checksum calculation.

Now all events properly checksum themselves as needed.

Use Log_event_writer in MYSQL_BIN_LOG::write_cache() instead
of copy-pasting its logic all over.

Later Log_event_writer will also do encryption.
parent 704ba5c5
...@@ -3461,7 +3461,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, ...@@ -3461,7 +3461,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
if (!s.is_valid()) if (!s.is_valid())
goto err; goto err;
s.dont_set_created= null_created_arg; s.dont_set_created= null_created_arg;
if (s.write(&log_file)) if (write_event(&s))
goto err; goto err;
bytes_written+= s.data_written; bytes_written+= s.data_written;
...@@ -3504,7 +3504,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, ...@@ -3504,7 +3504,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
*/ */
Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state, 0); Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state, 0);
if (gl_ev.write(&log_file)) if (write_event(&gl_ev))
goto err; goto err;
/* Output a binlog checkpoint event at the start of the binlog file. */ /* Output a binlog checkpoint event at the start of the binlog file. */
...@@ -3555,7 +3555,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, ...@@ -3555,7 +3555,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
flush_io_cache(&log_file); flush_io_cache(&log_file);
mysql_file_sync(log_file.file, MYF(MY_WME)); mysql_file_sync(log_file.file, MYF(MY_WME));
DBUG_SUICIDE();); DBUG_SUICIDE(););
if (ev.write(&log_file)) if (write_event(&ev))
goto err; goto err;
bytes_written+= ev.data_written; bytes_written+= ev.data_written;
} }
...@@ -3587,7 +3587,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, ...@@ -3587,7 +3587,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
/* Don't set log_pos in event header */ /* Don't set log_pos in event header */
description_event_for_queue->set_artificial_event(); description_event_for_queue->set_artificial_event();
if (description_event_for_queue->write(&log_file)) if (write_event(description_event_for_queue))
goto err; goto err;
bytes_written+= description_event_for_queue->data_written; bytes_written+= description_event_for_queue->data_written;
} }
...@@ -4991,7 +4991,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) ...@@ -4991,7 +4991,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
r.checksum_alg= relay_log_checksum_alg; r.checksum_alg= relay_log_checksum_alg;
DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
if(DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", (error=close_on_error=TRUE), FALSE) || if(DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", (error=close_on_error=TRUE), FALSE) ||
(error= r.write(&log_file))) (error= write_event(&r)))
{ {
DBUG_EXECUTE_IF("fault_injection_new_file_rotate_event", errno=2;); DBUG_EXECUTE_IF("fault_injection_new_file_rotate_event", errno=2;);
close_on_error= TRUE; close_on_error= TRUE;
...@@ -5105,9 +5105,13 @@ end: ...@@ -5105,9 +5105,13 @@ end:
DBUG_RETURN(error); DBUG_RETURN(error);
} }
bool MYSQL_BIN_LOG::write_event(Log_event *ev, IO_CACHE *file)
{
Log_event_writer writer(file);
return writer.write(ev);
}
bool bool MYSQL_BIN_LOG::append(Log_event *ev)
MYSQL_BIN_LOG::append(Log_event *ev)
{ {
bool res; bool res;
mysql_mutex_lock(&LOCK_log); mysql_mutex_lock(&LOCK_log);
...@@ -5124,11 +5128,8 @@ bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev) ...@@ -5124,11 +5128,8 @@ bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev)
mysql_mutex_assert_owner(&LOCK_log); mysql_mutex_assert_owner(&LOCK_log);
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND); DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
/*
Log_event::write() is smart enough to use my_b_write() or if (write_event(ev))
my_b_append() depending on the kind of cache we have.
*/
if (ev->write(&log_file))
{ {
error=1; error=1;
goto err; goto err;
...@@ -5516,15 +5517,16 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional, ...@@ -5516,15 +5517,16 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
IO_CACHE *file= IO_CACHE *file=
cache_mngr->get_binlog_cache_log(use_trans_cache(this, is_transactional)); cache_mngr->get_binlog_cache_log(use_trans_cache(this, is_transactional));
Log_event_writer writer(file);
if (with_annotate && *with_annotate) if (with_annotate && *with_annotate)
{ {
Annotate_rows_log_event anno(table->in_use, is_transactional, false); Annotate_rows_log_event anno(table->in_use, is_transactional, false);
/* Annotate event should be written not more than once */ /* Annotate event should be written not more than once */
*with_annotate= 0; *with_annotate= 0;
if ((error= anno.write(file))) if ((error= writer.write(&anno)))
DBUG_RETURN(error); DBUG_RETURN(error);
} }
if ((error= the_event.write(file))) if ((error= writer.write(&the_event)))
DBUG_RETURN(error); DBUG_RETURN(error);
binlog_table_maps++; binlog_table_maps++;
...@@ -5651,14 +5653,14 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, ...@@ -5651,14 +5653,14 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
if (Rows_log_event* pending= cache_data->pending()) if (Rows_log_event* pending= cache_data->pending())
{ {
IO_CACHE *file= &cache_data->cache_log; Log_event_writer writer(&cache_data->cache_log);
/* /*
Write pending event to the cache. Write pending event to the cache.
*/ */
DBUG_EXECUTE_IF("simulate_disk_full_at_flush_pending", DBUG_EXECUTE_IF("simulate_disk_full_at_flush_pending",
{DBUG_SET("+d,simulate_file_write_error");}); {DBUG_SET("+d,simulate_file_write_error");});
if (pending->write(file)) if (writer.write(pending))
{ {
set_write_error(thd, is_transactional); set_write_error(thd, is_transactional);
if (check_write_error(thd) && cache_data && if (check_write_error(thd) && cache_data &&
...@@ -5746,7 +5748,8 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, ...@@ -5746,7 +5748,8 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
commit_id); commit_id);
/* Write the event to the binary log. */ /* Write the event to the binary log. */
if (gtid_event.write(&mysql_bin_log.log_file)) DBUG_ASSERT(this == &mysql_bin_log);
if (write_event(&gtid_event))
DBUG_RETURN(true); DBUG_RETURN(true);
status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written); status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written);
...@@ -6080,7 +6083,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) ...@@ -6080,7 +6083,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
Annotate_rows_log_event anno(thd, using_trans, direct); Annotate_rows_log_event anno(thd, using_trans, direct);
/* Annotate event should be written not more than once */ /* Annotate event should be written not more than once */
*with_annotate= 0; *with_annotate= 0;
if (anno.write(file)) if (write_event(&anno, file))
goto err; goto err;
} }
...@@ -6094,7 +6097,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) ...@@ -6094,7 +6097,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT, Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT,
thd->first_successful_insert_id_in_prev_stmt_for_binlog, thd->first_successful_insert_id_in_prev_stmt_for_binlog,
using_trans, direct); using_trans, direct);
if (e.write(file)) if (write_event(&e, file))
goto err; goto err;
} }
if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0) if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0)
...@@ -6105,14 +6108,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) ...@@ -6105,14 +6108,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
Intvar_log_event e(thd, (uchar) INSERT_ID_EVENT, Intvar_log_event e(thd, (uchar) INSERT_ID_EVENT,
thd->auto_inc_intervals_in_cur_stmt_for_binlog. thd->auto_inc_intervals_in_cur_stmt_for_binlog.
minimum(), using_trans, direct); minimum(), using_trans, direct);
if (e.write(file)) if (write_event(&e, file))
goto err; goto err;
} }
if (thd->rand_used) if (thd->rand_used)
{ {
Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2, Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2,
using_trans, direct); using_trans, direct);
if (e.write(file)) if (write_event(&e, file))
goto err; goto err;
} }
if (thd->user_var_events.elements) if (thd->user_var_events.elements)
...@@ -6136,7 +6139,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) ...@@ -6136,7 +6139,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
flags, flags,
using_trans, using_trans,
direct); direct);
if (e.write(file)) if (write_event(&e, file))
goto err; goto err;
} }
} }
...@@ -6146,7 +6149,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) ...@@ -6146,7 +6149,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
/* /*
Write the event. Write the event.
*/ */
if (event_info->write(file) || if (write_event(event_info, file) ||
DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0)) DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0))
goto err; goto err;
...@@ -6525,33 +6528,34 @@ uint MYSQL_BIN_LOG::next_file_id() ...@@ -6525,33 +6528,34 @@ uint MYSQL_BIN_LOG::next_file_id()
return res; return res;
} }
class CacheWriter: public Log_event_writer
{
public:
ulong remains;
/** CacheWriter(THD *thd_arg, IO_CACHE *file_arg, bool do_checksum)
Calculate checksum of possibly a part of an event containing at least : Log_event_writer(file_arg), remains(0), thd(thd_arg), first(true)
the whole common header. { checksum_len= do_checksum ? BINLOG_CHECKSUM_LEN : 0; }
@param buf the pointer to trans cache's buffer
@param off the offset of the beginning of the event in the buffer
@param event_len no-checksum length of the event
@param length the current size of the buffer
@param crc [in-out] the checksum
Event size in incremented by @c BINLOG_CHECKSUM_LEN. ~CacheWriter()
{ status_var_add(thd->status_var.binlog_bytes_written, bytes_written); }
@return 0 or number of unprocessed yet bytes of the event excluding int write(uchar* pos, size_t len)
the checksum part. {
*/ if (first)
static ulong fix_log_event_crc(uchar *buf, uint off, uint event_len, write_header(pos, len);
uint length, ha_checksum *crc) else
{ write_data(pos, len);
ulong ret;
uchar *event_begin= buf + off;
ret= length >= off + event_len ? 0 : off + event_len - length; remains -= len;
*crc= my_checksum(*crc, event_begin, event_len - ret); if ((first= !remains))
return ret; write_footer();
} return 0;
}
private:
THD *thd;
bool first;
};
/* /*
Write the contents of a cache to the binary log. Write the contents of a cache to the binary log.
...@@ -6572,21 +6576,19 @@ uint MYSQL_BIN_LOG::next_file_id() ...@@ -6572,21 +6576,19 @@ uint MYSQL_BIN_LOG::next_file_id()
int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
{ {
DBUG_ENTER("MYSQL_BIN_LOG::write_cache");
mysql_mutex_assert_owner(&LOCK_log); mysql_mutex_assert_owner(&LOCK_log);
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
return ER_ERROR_ON_WRITE; DBUG_RETURN(ER_ERROR_ON_WRITE);
uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs; uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
ulong remains= 0; // part of unprocessed yet netto length of the event
long val; long val;
ulong end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t ulong end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t
uchar header[LOG_EVENT_HEADER_LEN]; uchar header[LOG_EVENT_HEADER_LEN];
ha_checksum crc= 0; CacheWriter writer(thd, &log_file, binlog_checksum_options);
my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF);
uchar buf[BINLOG_CHECKSUM_LEN];
DBUG_ENTER("MYSQL_BIN_LOG::write_cache");
// while there is just one alg the following must hold: // while there is just one alg the following must hold:
DBUG_ASSERT(!do_checksum || DBUG_ASSERT(binlog_checksum_options == BINLOG_CHECKSUM_ALG_OFF ||
binlog_checksum_options == BINLOG_CHECKSUM_ALG_CRC32); binlog_checksum_options == BINLOG_CHECKSUM_ALG_CRC32);
/* /*
...@@ -6615,53 +6617,40 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) ...@@ -6615,53 +6617,40 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
if (unlikely(carry > 0)) if (unlikely(carry > 0))
{ {
DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN); DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN);
uint tail= LOG_EVENT_HEADER_LEN - carry;
/* assemble both halves */ /* assemble both halves */
memcpy(&header[carry], (char *)cache->read_pos, memcpy(&header[carry], (char *)cache->read_pos, tail);
LOG_EVENT_HEADER_LEN - carry);
ulong len= uint4korr(header + EVENT_LEN_OFFSET);
writer.remains= len;
/* fix end_log_pos */ /* fix end_log_pos */
val= uint4korr(&header[LOG_POS_OFFSET]) + group + end_log_pos_inc += writer.checksum_len;
(end_log_pos_inc+= (do_checksum ? BINLOG_CHECKSUM_LEN : 0)); val= uint4korr(header + LOG_POS_OFFSET) + group + end_log_pos_inc;
int4store(&header[LOG_POS_OFFSET], val); int4store(header + LOG_POS_OFFSET, val);
if (do_checksum) /* fix len */
{ len+= writer.checksum_len;
ulong len= uint4korr(&header[EVENT_LEN_OFFSET]); int4store(header + EVENT_LEN_OFFSET, len);
/* fix len */
int4store(&header[EVENT_LEN_OFFSET], len + BINLOG_CHECKSUM_LEN);
}
/* write the first half of the split header */ if (writer.write(header, LOG_EVENT_HEADER_LEN))
if (my_b_write(&log_file, header, carry))
DBUG_RETURN(ER_ERROR_ON_WRITE); DBUG_RETURN(ER_ERROR_ON_WRITE);
status_var_add(thd->status_var.binlog_bytes_written, carry);
/* cache->read_pos+= tail;
copy fixed second half of header to cache so the correct length-= tail;
version will be written later. carry= 0;
*/
memcpy((char *)cache->read_pos, &header[carry],
LOG_EVENT_HEADER_LEN - carry);
/* next event header at ... */ /* next event header at ... */
hdr_offs= uint4korr(&header[EVENT_LEN_OFFSET]) - carry - hdr_offs= len - LOG_EVENT_HEADER_LEN - writer.checksum_len;
(do_checksum ? BINLOG_CHECKSUM_LEN : 0);
if (do_checksum)
{
DBUG_ASSERT(crc == 0 && remains == 0);
crc= my_checksum(crc, header, carry);
remains= uint4korr(header + EVENT_LEN_OFFSET) - carry -
BINLOG_CHECKSUM_LEN;
}
carry= 0;
} }
/* if there is anything to write, process it. */ /* if there is anything to write, process it. */
if (likely(length > 0)) if (likely(length > 0))
{ {
DBUG_EXECUTE_IF("fail_binlog_write_1",
errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE););
/* /*
process all event-headers in this (partial) cache. process all event-headers in this (partial) cache.
if next header is beyond current read-buffer, if next header is beyond current read-buffer,
...@@ -6669,52 +6658,28 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) ...@@ -6669,52 +6658,28 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
very next iteration, just "eventually"). very next iteration, just "eventually").
*/ */
/* crc-calc the whole buffer */ if (hdr_offs >= length)
if (do_checksum && hdr_offs >= length)
{ {
if (writer.write(cache->read_pos, length))
DBUG_ASSERT(remains != 0 && crc != 0);
crc= my_checksum(crc, cache->read_pos, length);
remains -= length;
if (my_b_write(&log_file, cache->read_pos, length))
DBUG_RETURN(ER_ERROR_ON_WRITE); DBUG_RETURN(ER_ERROR_ON_WRITE);
if (remains == 0)
{
int4store(buf, crc);
if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
DBUG_RETURN(ER_ERROR_ON_WRITE);
crc= 0;
}
} }
while (hdr_offs < length) while (hdr_offs < length)
{ {
/* /*
partial header only? save what we can get, process once finish off with remains of the last event that crawls
we get the rest. from previous into the current buffer
*/ */
if (writer.remains != 0)
if (do_checksum)
{ {
if (remains != 0) if (writer.write(cache->read_pos, hdr_offs))
{ DBUG_RETURN(ER_ERROR_ON_WRITE);
/*
finish off with remains of the last event that crawls
from previous into the current buffer
*/
DBUG_ASSERT(crc != 0);
crc= my_checksum(crc, cache->read_pos, hdr_offs);
int4store(buf, crc);
remains -= hdr_offs;
DBUG_ASSERT(remains == 0);
if (my_b_write(&log_file, cache->read_pos, hdr_offs) ||
my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
DBUG_RETURN(ER_ERROR_ON_WRITE);
crc= 0;
}
} }
/*
partial header only? save what we can get, process once
we get the rest.
*/
if (hdr_offs + LOG_EVENT_HEADER_LEN > length) if (hdr_offs + LOG_EVENT_HEADER_LEN > length)
{ {
carry= length - hdr_offs; carry= length - hdr_offs;
...@@ -6725,37 +6690,25 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) ...@@ -6725,37 +6690,25 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
{ {
/* we've got a full event-header, and it came in one piece */ /* we've got a full event-header, and it came in one piece */
uchar *ev= (uchar *)cache->read_pos + hdr_offs; uchar *ev= (uchar *)cache->read_pos + hdr_offs;
uint event_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len uint ev_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len
uchar *log_pos= ev + LOG_POS_OFFSET; uchar *log_pos= ev + LOG_POS_OFFSET;
end_log_pos_inc += writer.checksum_len;
/* fix end_log_pos */ /* fix end_log_pos */
val= uint4korr(log_pos) + group + val= uint4korr(log_pos) + group + end_log_pos_inc;
(end_log_pos_inc += (do_checksum ? BINLOG_CHECKSUM_LEN : 0));
int4store(log_pos, val); int4store(log_pos, val);
/* fix CRC */ /* fix length */
if (do_checksum) int4store(ev + EVENT_LEN_OFFSET, ev_len + writer.checksum_len);
{
/* fix length */ writer.remains= ev_len;
int4store(ev + EVENT_LEN_OFFSET, event_len + BINLOG_CHECKSUM_LEN); if (writer.write(ev, std::min<uint>(ev_len, length - hdr_offs)))
remains= fix_log_event_crc(cache->read_pos, hdr_offs, event_len, DBUG_RETURN(ER_ERROR_ON_WRITE);
length, &crc);
if (my_b_write(&log_file, ev,
remains == 0 ? event_len : length - hdr_offs))
DBUG_RETURN(ER_ERROR_ON_WRITE);
if (remains == 0)
{
int4store(buf, crc);
if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
DBUG_RETURN(ER_ERROR_ON_WRITE);
crc= 0; // crc is complete
}
}
/* next event header at ... */ /* next event header at ... */
hdr_offs += event_len; // incr by the netto len hdr_offs += ev_len; // incr by the netto len
DBUG_ASSERT(!do_checksum || remains == 0 || hdr_offs >= length); DBUG_ASSERT(!writer.checksum_len || writer.remains == 0 || hdr_offs >= length);
} }
} }
...@@ -6769,20 +6722,10 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) ...@@ -6769,20 +6722,10 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
*/ */
hdr_offs -= length; hdr_offs -= length;
} }
/* Write data to the binary log file */
DBUG_EXECUTE_IF("fail_binlog_write_1",
errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE););
if (!do_checksum)
if (my_b_write(&log_file, cache->read_pos, length))
DBUG_RETURN(ER_ERROR_ON_WRITE);
status_var_add(thd->status_var.binlog_bytes_written, length);
} while ((length= my_b_fill(cache))); } while ((length= my_b_fill(cache)));
DBUG_ASSERT(carry == 0); DBUG_ASSERT(carry == 0);
DBUG_ASSERT(!do_checksum || remains == 0); DBUG_ASSERT(!writer.checksum_len || writer.remains == 0);
DBUG_ASSERT(!do_checksum || crc == 0);
DBUG_RETURN(0); // All OK DBUG_RETURN(0); // All OK
} }
...@@ -6827,7 +6770,7 @@ bool MYSQL_BIN_LOG::write_incident_already_locked(THD *thd) ...@@ -6827,7 +6770,7 @@ bool MYSQL_BIN_LOG::write_incident_already_locked(THD *thd)
if (likely(is_open())) if (likely(is_open()))
{ {
error= ev.write(&log_file); error= write_event(&ev);
status_var_add(thd->status_var.binlog_bytes_written, ev.data_written); status_var_add(thd->status_var.binlog_bytes_written, ev.data_written);
} }
...@@ -6889,7 +6832,7 @@ MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name_arg ...@@ -6889,7 +6832,7 @@ MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name_arg
Otherwise a subsequent log purge could delete binlogs that XA recovery Otherwise a subsequent log purge could delete binlogs that XA recovery
thinks are needed (even though they are not really). thinks are needed (even though they are not really).
*/ */
if (!ev.write(&log_file) && !flush_and_sync(0)) if (!write_event(&ev) && !flush_and_sync(0))
{ {
signal_update(); signal_update();
} }
...@@ -7797,7 +7740,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, ...@@ -7797,7 +7740,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
DBUG_RETURN(ER_ERROR_ON_WRITE); DBUG_RETURN(ER_ERROR_ON_WRITE);
}); });
if (entry->end_event->write(&log_file)) if (write_event(entry->end_event))
{ {
entry->error_cache= NULL; entry->error_cache= NULL;
DBUG_RETURN(ER_ERROR_ON_WRITE); DBUG_RETURN(ER_ERROR_ON_WRITE);
...@@ -7807,7 +7750,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, ...@@ -7807,7 +7750,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
if (entry->incident_event) if (entry->incident_event)
{ {
if (entry->incident_event->write(&log_file)) if (write_event(entry->incident_event))
{ {
entry->error_cache= NULL; entry->error_cache= NULL;
DBUG_RETURN(ER_ERROR_ON_WRITE); DBUG_RETURN(ER_ERROR_ON_WRITE);
...@@ -8066,7 +8009,7 @@ void MYSQL_BIN_LOG::close(uint exiting) ...@@ -8066,7 +8009,7 @@ void MYSQL_BIN_LOG::close(uint exiting)
: (enum_binlog_checksum_alg)binlog_checksum_options; : (enum_binlog_checksum_alg)binlog_checksum_options;
DBUG_ASSERT(!is_relay_log || DBUG_ASSERT(!is_relay_log ||
relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
s.write(&log_file); write_event(&s);
bytes_written+= s.data_written; bytes_written+= s.data_written;
signal_update(); signal_update();
......
...@@ -737,6 +737,9 @@ public: ...@@ -737,6 +737,9 @@ public:
void stop_union_events(THD *thd); void stop_union_events(THD *thd);
bool is_query_in_union(THD *thd, query_id_t query_id_param); bool is_query_in_union(THD *thd, query_id_t query_id_param);
bool write_event(Log_event *ev, IO_CACHE *file);
bool write_event(Log_event *ev) { return write_event(ev, &log_file); }
/* /*
v stands for vector v stands for vector
invoked as appendv(buf1,len1,buf2,len2,...,bufn,lenn,0) invoked as appendv(buf1,len1,buf2,len2,...,bufn,lenn,0)
......
...@@ -667,19 +667,6 @@ static void cleanup_load_tmpdir(LEX_STRING *connection_name) ...@@ -667,19 +667,6 @@ static void cleanup_load_tmpdir(LEX_STRING *connection_name)
#endif #endif
/*
write_str()
*/
static bool write_str(IO_CACHE *file, const char *str, uint length)
{
uchar tmp[1];
tmp[0]= (uchar) length;
return (my_b_safe_write(file, tmp, sizeof(tmp)) ||
my_b_safe_write(file, (uchar*) str, length));
}
/* /*
read_str() read_str()
*/ */
...@@ -845,8 +832,7 @@ const char* Log_event::get_type_str() ...@@ -845,8 +832,7 @@ const char* Log_event::get_type_str()
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
:log_pos(0), temp_buf(0), exec_time(0), :log_pos(0), temp_buf(0), exec_time(0), thd(thd_arg),
crc(0), thd(thd_arg),
checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{ {
server_id= thd->variables.server_id; server_id= thd->variables.server_id;
...@@ -870,8 +856,7 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) ...@@ -870,8 +856,7 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
*/ */
Log_event::Log_event() Log_event::Log_event()
:temp_buf(0), exec_time(0), flags(0), :temp_buf(0), exec_time(0), flags(0), cache_type(EVENT_INVALID_CACHE),
cache_type(Log_event::EVENT_INVALID_CACHE), crc(0),
thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{ {
server_id= global_system_variables.server_id; server_id= global_system_variables.server_id;
...@@ -893,7 +878,7 @@ Log_event::Log_event() ...@@ -893,7 +878,7 @@ Log_event::Log_event()
Log_event::Log_event(const char* buf, Log_event::Log_event(const char* buf,
const Format_description_log_event* description_event) const Format_description_log_event* description_event)
:temp_buf(0), exec_time(0), cache_type(Log_event::EVENT_INVALID_CACHE), :temp_buf(0), exec_time(0), cache_type(Log_event::EVENT_INVALID_CACHE),
crc(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{ {
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
thd = 0; thd = 0;
...@@ -1159,26 +1144,51 @@ my_bool Log_event::need_checksum() ...@@ -1159,26 +1144,51 @@ my_bool Log_event::need_checksum()
DBUG_RETURN(ret); DBUG_RETURN(ret);
} }
bool Log_event::wrapper_my_b_safe_write(IO_CACHE* file, const uchar* buf, ulong size) int Log_event_writer::write_internal(const uchar *pos, size_t len)
{ {
if (need_checksum() && size != 0) if (my_b_safe_write(file, pos, len))
crc= my_checksum(crc, buf, size); return 1;
bytes_written+= len;
return my_b_safe_write(file, buf, size); return 0;
} }
bool Log_event::write_footer(IO_CACHE* file) int Log_event_writer::write_header(uchar *pos, size_t len)
{ {
DBUG_ENTER("write_footer"); DBUG_ENTER("write_footer");
/* /*
(optional) footer contains the checksum value recording checksum of FD event computed with dropped
possibly active LOG_EVENT_BINLOG_IN_USE_F flag.
Similar step at verication: the active flag is dropped before
checksum computing.
*/ */
if (need_checksum()) if (checksum_len)
{
uchar save=pos[FLAGS_OFFSET];
pos[FLAGS_OFFSET]&= ~LOG_EVENT_BINLOG_IN_USE_F;
crc= my_checksum(0, pos, len);
pos[FLAGS_OFFSET]= save;
}
return write_internal(pos, len);
}
int Log_event_writer::write_data(const uchar *pos, size_t len)
{
if (checksum_len)
crc= my_checksum(crc, pos, len);
return write_internal(pos, len);
}
int Log_event_writer::write_footer()
{
DBUG_ENTER("Log_event_writer::write_footer");
if (checksum_len)
{ {
DBUG_PRINT("info", ("Writing checksum")); uchar checksum_buf[BINLOG_CHECKSUM_LEN];
uchar buf[BINLOG_CHECKSUM_LEN]; int4store(checksum_buf, crc);
int4store(buf, crc); if (write_internal(checksum_buf, BINLOG_CHECKSUM_LEN))
DBUG_RETURN(my_b_safe_write(file, (uchar*) buf, sizeof(buf))); DBUG_RETURN(ER_ERROR_ON_WRITE);
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1187,24 +1197,19 @@ bool Log_event::write_footer(IO_CACHE* file) ...@@ -1187,24 +1197,19 @@ bool Log_event::write_footer(IO_CACHE* file)
Log_event::write_header() Log_event::write_header()
*/ */
bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) bool Log_event::write_header(ulong event_data_length)
{ {
uchar header[LOG_EVENT_HEADER_LEN]; uchar header[LOG_EVENT_HEADER_LEN];
ulong now; ulong now;
bool ret;
DBUG_ENTER("Log_event::write_header"); DBUG_ENTER("Log_event::write_header");
DBUG_PRINT("enter", ("filepos: %lld length: %lu type: %d", DBUG_PRINT("enter", ("filepos: %lld length: %lu type: %d",
(longlong) my_b_tell(file), event_data_length, (longlong) writer->pos(), event_data_length,
(int) get_type_code())); (int) get_type_code()));
/* Store number of bytes that will be written by this event */ writer->checksum_len= need_checksum() ? BINLOG_CHECKSUM_LEN : 0;
data_written= event_data_length + sizeof(header);
if (need_checksum()) /* Store number of bytes that will be written by this event */
{ data_written= event_data_length + sizeof(header) + writer->checksum_len;
crc= 0;
data_written += BINLOG_CHECKSUM_LEN;
}
/* /*
log_pos != 0 if this is relay-log event. In this case we should not log_pos != 0 if this is relay-log event. In this case we should not
...@@ -1226,7 +1231,7 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) ...@@ -1226,7 +1231,7 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
(end of this event, that is). (end of this event, that is).
*/ */
log_pos= my_b_safe_tell(file)+data_written; log_pos= writer->pos() + data_written;
} }
now= get_time(); // Query start time now= get_time(); // Query start time
...@@ -1243,36 +1248,10 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) ...@@ -1243,36 +1248,10 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
int4store(header+ SERVER_ID_OFFSET, server_id); int4store(header+ SERVER_ID_OFFSET, server_id);
int4store(header+ EVENT_LEN_OFFSET, data_written); int4store(header+ EVENT_LEN_OFFSET, data_written);
int4store(header+ LOG_POS_OFFSET, log_pos); int4store(header+ LOG_POS_OFFSET, log_pos);
/* int2store(header + FLAGS_OFFSET, flags);
recording checksum of FD event computed with dropped
possibly active LOG_EVENT_BINLOG_IN_USE_F flag. bool ret= writer->write_header(header, sizeof(header));
Similar step at verication: the active flag is dropped before DBUG_RETURN(ret);
checksum computing.
*/
if (header[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT ||
!need_checksum() || !(flags & LOG_EVENT_BINLOG_IN_USE_F))
{
int2store(header+ FLAGS_OFFSET, flags);
ret= wrapper_my_b_safe_write(file, header, sizeof(header)) != 0;
}
else
{
ret= (wrapper_my_b_safe_write(file, header, FLAGS_OFFSET) != 0);
if (!ret)
{
flags &= ~LOG_EVENT_BINLOG_IN_USE_F;
int2store(header + FLAGS_OFFSET, flags);
crc= my_checksum(crc, header + FLAGS_OFFSET, sizeof(flags));
flags |= LOG_EVENT_BINLOG_IN_USE_F;
int2store(header + FLAGS_OFFSET, flags);
ret= (my_b_safe_write(file, header + FLAGS_OFFSET, sizeof(flags)) != 0);
}
if (!ret)
ret= (wrapper_my_b_safe_write(file, header + FLAGS_OFFSET + sizeof(flags),
sizeof(header)
- (FLAGS_OFFSET + sizeof(flags))) != 0);
}
DBUG_RETURN( ret);
} }
#endif /* !MYSQL_CLIENT */ #endif /* !MYSQL_CLIENT */
...@@ -1664,9 +1643,11 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, ...@@ -1664,9 +1643,11 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
if (ev) if (ev)
{ {
ev->checksum_alg= alg; ev->checksum_alg= alg;
#ifdef MYSQL_CLIENT
if (ev->checksum_alg != BINLOG_CHECKSUM_ALG_OFF && if (ev->checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
ev->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ev->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
ev->crc= uint4korr(buf + (event_len)); ev->crc= uint4korr(buf + (event_len));
#endif
} }
DBUG_PRINT("read_event", ("%s(type_code: %u; event_len: %u)", DBUG_PRINT("read_event", ("%s(type_code: %u; event_len: %u)",
...@@ -2749,7 +2730,7 @@ static void store_str_with_code_and_len(uchar **dst, const char *src, ...@@ -2749,7 +2730,7 @@ static void store_str_with_code_and_len(uchar **dst, const char *src,
will print! will print!
*/ */
bool Query_log_event::write(IO_CACHE* file) bool Query_log_event::write()
{ {
uchar buf[QUERY_HEADER_LEN + MAX_SIZE_LOG_EVENT_STATUS]; uchar buf[QUERY_HEADER_LEN + MAX_SIZE_LOG_EVENT_STATUS];
uchar *start, *start_of_status; uchar *start, *start_of_status;
...@@ -2978,14 +2959,13 @@ bool Query_log_event::write(IO_CACHE* file) ...@@ -2978,14 +2959,13 @@ bool Query_log_event::write(IO_CACHE* file)
*/ */
event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len; event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
return (write_header(file, event_length) || return write_header(event_length) ||
wrapper_my_b_safe_write(file, (uchar*) buf, QUERY_HEADER_LEN) || write_data(buf, QUERY_HEADER_LEN) ||
write_post_header_for_derived(file) || write_post_header_for_derived() ||
wrapper_my_b_safe_write(file, (uchar*) start_of_status, write_data(start_of_status, (uint) (start-start_of_status)) ||
(uint) (start-start_of_status)) || write_data(safe_str(db), db_len + 1) ||
wrapper_my_b_safe_write(file, (db) ? (uchar*) db : (uchar*)"", db_len + 1) || write_data(query, q_len) ||
wrapper_my_b_safe_write(file, (uchar*) query, q_len) || write_footer();
write_footer(file)) ? 1 : 0;
} }
/** /**
...@@ -4610,7 +4590,7 @@ Start_log_event_v3::Start_log_event_v3(const char* buf, uint event_len, ...@@ -4610,7 +4590,7 @@ Start_log_event_v3::Start_log_event_v3(const char* buf, uint event_len,
*/ */
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool Start_log_event_v3::write(IO_CACHE* file) bool Start_log_event_v3::write()
{ {
char buff[START_V3_HEADER_LEN]; char buff[START_V3_HEADER_LEN];
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
...@@ -4618,9 +4598,9 @@ bool Start_log_event_v3::write(IO_CACHE* file) ...@@ -4618,9 +4598,9 @@ bool Start_log_event_v3::write(IO_CACHE* file)
if (!dont_set_created) if (!dont_set_created)
created= get_time(); // this sets when and when_sec_part as a side effect created= get_time(); // this sets when and when_sec_part as a side effect
int4store(buff + ST_CREATED_OFFSET,created); int4store(buff + ST_CREATED_OFFSET,created);
return (write_header(file, sizeof(buff)) || return write_header(sizeof(buff)) ||
wrapper_my_b_safe_write(file, (uchar*) buff, sizeof(buff)) || write_data(buff, sizeof(buff)) ||
write_footer(file)); write_footer();
} }
#endif #endif
...@@ -4933,7 +4913,7 @@ Format_description_log_event(const char* buf, ...@@ -4933,7 +4913,7 @@ Format_description_log_event(const char* buf,
} }
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool Format_description_log_event::write(IO_CACHE* file) bool Format_description_log_event::write()
{ {
bool ret; bool ret;
bool no_checksum; bool no_checksum;
...@@ -4981,12 +4961,11 @@ bool Format_description_log_event::write(IO_CACHE* file) ...@@ -4981,12 +4961,11 @@ bool Format_description_log_event::write(IO_CACHE* file)
{ {
checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway
} }
ret= (write_header(file, rec_size) || ret= write_header(rec_size) ||
wrapper_my_b_safe_write(file, buff, sizeof(buff)) || write_data(buff, sizeof(buff)) ||
wrapper_my_b_safe_write(file, (uchar*)post_header_len, write_data(post_header_len, number_of_event_types) ||
number_of_event_types) || write_data(&checksum_byte, sizeof(checksum_byte)) ||
wrapper_my_b_safe_write(file, &checksum_byte, sizeof(checksum_byte)) || write_footer();
write_footer(file));
if (no_checksum) if (no_checksum)
checksum_alg= BINLOG_CHECKSUM_ALG_OFF; checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
return ret; return ret;
...@@ -5332,7 +5311,7 @@ void Load_log_event::pack_info(THD *thd, Protocol *protocol) ...@@ -5332,7 +5311,7 @@ void Load_log_event::pack_info(THD *thd, Protocol *protocol)
Load_log_event::write_data_header() Load_log_event::write_data_header()
*/ */
bool Load_log_event::write_data_header(IO_CACHE* file) bool Load_log_event::write_data_header()
{ {
char buf[LOAD_HEADER_LEN]; char buf[LOAD_HEADER_LEN];
int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id); int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
...@@ -5341,7 +5320,7 @@ bool Load_log_event::write_data_header(IO_CACHE* file) ...@@ -5341,7 +5320,7 @@ bool Load_log_event::write_data_header(IO_CACHE* file)
buf[L_TBL_LEN_OFFSET] = (char)table_name_len; buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
buf[L_DB_LEN_OFFSET] = (char)db_len; buf[L_DB_LEN_OFFSET] = (char)db_len;
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields); int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
return my_b_safe_write(file, (uchar*)buf, LOAD_HEADER_LEN) != 0; return write_data(buf, LOAD_HEADER_LEN) != 0;
} }
...@@ -5349,19 +5328,19 @@ bool Load_log_event::write_data_header(IO_CACHE* file) ...@@ -5349,19 +5328,19 @@ bool Load_log_event::write_data_header(IO_CACHE* file)
Load_log_event::write_data_body() Load_log_event::write_data_body()
*/ */
bool Load_log_event::write_data_body(IO_CACHE* file) bool Load_log_event::write_data_body()
{ {
if (sql_ex.write_data(file)) if (sql_ex.write_data(writer))
return 1; return 1;
if (num_fields && fields && field_lens) if (num_fields && fields && field_lens)
{ {
if (my_b_safe_write(file, (uchar*)field_lens, num_fields) || if (write_data(field_lens, num_fields) ||
my_b_safe_write(file, (uchar*)fields, field_block_len)) write_data(fields, field_block_len))
return 1; return 1;
} }
return (my_b_safe_write(file, (uchar*)table_name, table_name_len + 1) || return (write_data(table_name, table_name_len + 1) ||
my_b_safe_write(file, (uchar*)db, db_len + 1) || write_data(db, db_len + 1) ||
my_b_safe_write(file, (uchar*)fname, fname_len)); write_data(fname, fname_len));
} }
...@@ -6078,15 +6057,14 @@ Rotate_log_event::Rotate_log_event(const char* buf, uint event_len, ...@@ -6078,15 +6057,14 @@ Rotate_log_event::Rotate_log_event(const char* buf, uint event_len,
*/ */
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool Rotate_log_event::write(IO_CACHE* file) bool Rotate_log_event::write()
{ {
char buf[ROTATE_HEADER_LEN]; char buf[ROTATE_HEADER_LEN];
int8store(buf + R_POS_OFFSET, pos); int8store(buf + R_POS_OFFSET, pos);
return (write_header(file, ROTATE_HEADER_LEN + ident_len) || return (write_header(ROTATE_HEADER_LEN + ident_len) ||
wrapper_my_b_safe_write(file, (uchar*) buf, ROTATE_HEADER_LEN) || write_data(buf, ROTATE_HEADER_LEN) ||
wrapper_my_b_safe_write(file, (uchar*) new_log_ident, write_data(new_log_ident, (uint) ident_len) ||
(uint) ident_len) || write_footer());
write_footer(file));
} }
#endif #endif
...@@ -6276,15 +6254,14 @@ Binlog_checkpoint_log_event::Binlog_checkpoint_log_event( ...@@ -6276,15 +6254,14 @@ Binlog_checkpoint_log_event::Binlog_checkpoint_log_event(
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool Binlog_checkpoint_log_event::write(IO_CACHE *file) bool Binlog_checkpoint_log_event::write()
{ {
uchar buf[BINLOG_CHECKPOINT_HEADER_LEN]; uchar buf[BINLOG_CHECKPOINT_HEADER_LEN];
int4store(buf, binlog_file_len); int4store(buf, binlog_file_len);
return write_header(file, BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) || return write_header(BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) ||
wrapper_my_b_safe_write(file, buf, BINLOG_CHECKPOINT_HEADER_LEN) || write_data(buf, BINLOG_CHECKPOINT_HEADER_LEN) ||
wrapper_my_b_safe_write(file, (const uchar *)binlog_file_name, write_data(binlog_file_name, binlog_file_len) ||
binlog_file_len) || write_footer();
write_footer(file);
} }
#endif /* MYSQL_CLIENT */ #endif /* MYSQL_CLIENT */
...@@ -6386,7 +6363,7 @@ Gtid_log_event::peek(const char *event_start, size_t event_len, ...@@ -6386,7 +6363,7 @@ Gtid_log_event::peek(const char *event_start, size_t event_len,
bool bool
Gtid_log_event::write(IO_CACHE *file) Gtid_log_event::write()
{ {
uchar buf[GTID_HEADER_LEN+2]; uchar buf[GTID_HEADER_LEN+2];
size_t write_len; size_t write_len;
...@@ -6404,9 +6381,9 @@ Gtid_log_event::write(IO_CACHE *file) ...@@ -6404,9 +6381,9 @@ Gtid_log_event::write(IO_CACHE *file)
bzero(buf+13, GTID_HEADER_LEN-13); bzero(buf+13, GTID_HEADER_LEN-13);
write_len= GTID_HEADER_LEN; write_len= GTID_HEADER_LEN;
} }
return write_header(file, write_len) || return write_header(write_len) ||
wrapper_my_b_safe_write(file, buf, write_len) || write_data(buf, write_len) ||
write_footer(file); write_footer();
} }
...@@ -6764,7 +6741,7 @@ Gtid_list_log_event::to_packet(String *packet) ...@@ -6764,7 +6741,7 @@ Gtid_list_log_event::to_packet(String *packet)
bool bool
Gtid_list_log_event::write(IO_CACHE *file) Gtid_list_log_event::write()
{ {
char buf[128]; char buf[128];
String packet(buf, sizeof(buf), system_charset_info); String packet(buf, sizeof(buf), system_charset_info);
...@@ -6772,10 +6749,9 @@ Gtid_list_log_event::write(IO_CACHE *file) ...@@ -6772,10 +6749,9 @@ Gtid_list_log_event::write(IO_CACHE *file)
packet.length(0); packet.length(0);
if (to_packet(&packet)) if (to_packet(&packet))
return true; return true;
return return write_header(get_data_size()) ||
write_header(file, get_data_size()) || write_data(packet.ptr(), packet.length()) ||
wrapper_my_b_safe_write(file, (uchar *)packet.ptr(), packet.length()) || write_footer();
write_footer(file);
} }
...@@ -6980,14 +6956,14 @@ const char* Intvar_log_event::get_var_type_name() ...@@ -6980,14 +6956,14 @@ const char* Intvar_log_event::get_var_type_name()
*/ */
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool Intvar_log_event::write(IO_CACHE* file) bool Intvar_log_event::write()
{ {
uchar buf[9]; uchar buf[9];
buf[I_TYPE_OFFSET]= (uchar) type; buf[I_TYPE_OFFSET]= (uchar) type;
int8store(buf + I_VAL_OFFSET, val); int8store(buf + I_VAL_OFFSET, val);
return (write_header(file, sizeof(buf)) || return write_header(sizeof(buf)) ||
wrapper_my_b_safe_write(file, buf, sizeof(buf)) || write_data(buf, sizeof(buf)) ||
write_footer(file)); write_footer();
} }
#endif #endif
...@@ -7110,14 +7086,14 @@ Rand_log_event::Rand_log_event(const char* buf, ...@@ -7110,14 +7086,14 @@ Rand_log_event::Rand_log_event(const char* buf,
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool Rand_log_event::write(IO_CACHE* file) bool Rand_log_event::write()
{ {
uchar buf[16]; uchar buf[16];
int8store(buf + RAND_SEED1_OFFSET, seed1); int8store(buf + RAND_SEED1_OFFSET, seed1);
int8store(buf + RAND_SEED2_OFFSET, seed2); int8store(buf + RAND_SEED2_OFFSET, seed2);
return (write_header(file, sizeof(buf)) || return write_header(sizeof(buf)) ||
wrapper_my_b_safe_write(file, buf, sizeof(buf)) || write_data(buf, sizeof(buf)) ||
write_footer(file)); write_footer();
} }
#endif #endif
...@@ -7236,12 +7212,12 @@ Xid_log_event(const char* buf, ...@@ -7236,12 +7212,12 @@ Xid_log_event(const char* buf,
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool Xid_log_event::write(IO_CACHE* file) bool Xid_log_event::write()
{ {
DBUG_EXECUTE_IF("do_not_write_xid", return 0;); DBUG_EXECUTE_IF("do_not_write_xid", return 0;);
return (write_header(file, sizeof(xid)) || return write_header(sizeof(xid)) ||
wrapper_my_b_safe_write(file, (uchar*) &xid, sizeof(xid)) || write_data((uchar*)&xid, sizeof(xid)) ||
write_footer(file)); write_footer();
} }
#endif #endif
...@@ -7587,7 +7563,7 @@ err: ...@@ -7587,7 +7563,7 @@ err:
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool User_var_log_event::write(IO_CACHE* file) bool User_var_log_event::write()
{ {
char buf[UV_NAME_LEN_SIZE]; char buf[UV_NAME_LEN_SIZE];
char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
...@@ -7642,13 +7618,13 @@ bool User_var_log_event::write(IO_CACHE* file) ...@@ -7642,13 +7618,13 @@ bool User_var_log_event::write(IO_CACHE* file)
/* Length of the whole event */ /* Length of the whole event */
event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len; event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len;
return (write_header(file, event_length) || return write_header(event_length) ||
wrapper_my_b_safe_write(file, (uchar*) buf, sizeof(buf)) || write_data(buf, sizeof(buf)) ||
wrapper_my_b_safe_write(file, (uchar*) name, name_len) || write_data(name, name_len) ||
wrapper_my_b_safe_write(file, (uchar*) buf1, buf1_length) || write_data(buf1, buf1_length) ||
wrapper_my_b_safe_write(file, pos, val_len) || write_data(pos, val_len) ||
wrapper_my_b_safe_write(file, &flags, unsigned_len) || write_data(&flags, unsigned_len) ||
write_footer(file)); write_footer();
} }
#endif #endif
...@@ -7986,13 +7962,13 @@ Create_file_log_event(THD* thd_arg, sql_exchange* ex, ...@@ -7986,13 +7962,13 @@ Create_file_log_event(THD* thd_arg, sql_exchange* ex,
Create_file_log_event::write_data_body() Create_file_log_event::write_data_body()
*/ */
bool Create_file_log_event::write_data_body(IO_CACHE* file) bool Create_file_log_event::write_data_body()
{ {
bool res; bool res;
if ((res= Load_log_event::write_data_body(file)) || fake_base) if ((res= Load_log_event::write_data_body()) || fake_base)
return res; return res;
return (my_b_safe_write(file, (uchar*) "", 1) || return write_data("", 1) ||
my_b_safe_write(file, (uchar*) block, block_len)); write_data(block, block_len);
} }
...@@ -8000,14 +7976,14 @@ bool Create_file_log_event::write_data_body(IO_CACHE* file) ...@@ -8000,14 +7976,14 @@ bool Create_file_log_event::write_data_body(IO_CACHE* file)
Create_file_log_event::write_data_header() Create_file_log_event::write_data_header()
*/ */
bool Create_file_log_event::write_data_header(IO_CACHE* file) bool Create_file_log_event::write_data_header()
{ {
bool res; bool res;
uchar buf[CREATE_FILE_HEADER_LEN]; uchar buf[CREATE_FILE_HEADER_LEN];
if ((res= Load_log_event::write_data_header(file)) || fake_base) if ((res= Load_log_event::write_data_header()) || fake_base)
return res; return res;
int4store(buf + CF_FILE_ID_OFFSET, file_id); int4store(buf + CF_FILE_ID_OFFSET, file_id);
return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0; return write_data(buf, CREATE_FILE_HEADER_LEN) != 0;
} }
...@@ -8015,11 +7991,11 @@ bool Create_file_log_event::write_data_header(IO_CACHE* file) ...@@ -8015,11 +7991,11 @@ bool Create_file_log_event::write_data_header(IO_CACHE* file)
Create_file_log_event::write_base() Create_file_log_event::write_base()
*/ */
bool Create_file_log_event::write_base(IO_CACHE* file) bool Create_file_log_event::write_base()
{ {
bool res; bool res;
fake_base= 1; // pretend we are Load event fake_base= 1; // pretend we are Load event
res= write(file); res= write();
fake_base= 0; fake_base= 0;
return res; return res;
} }
...@@ -8166,6 +8142,7 @@ int Create_file_log_event::do_apply_event(rpl_group_info *rgi) ...@@ -8166,6 +8142,7 @@ int Create_file_log_event::do_apply_event(rpl_group_info *rgi)
char *ext; char *ext;
int fd = -1; int fd = -1;
IO_CACHE file; IO_CACHE file;
Log_event_writer lew(&file);
int error = 1; int error = 1;
Relay_log_info const *rli= rgi->rli; Relay_log_info const *rli= rgi->rli;
...@@ -8191,7 +8168,8 @@ int Create_file_log_event::do_apply_event(rpl_group_info *rgi) ...@@ -8191,7 +8168,8 @@ int Create_file_log_event::do_apply_event(rpl_group_info *rgi)
// a trick to avoid allocating another buffer // a trick to avoid allocating another buffer
fname= fname_buf; fname= fname_buf;
fname_len= (uint) (strmov(ext, ".data") - fname); fname_len= (uint) (strmov(ext, ".data") - fname);
if (write_base(&file)) writer= &lew;
if (write_base())
{ {
strmov(ext, ".info"); // to have it right in the error message strmov(ext, ".info"); // to have it right in the error message
rli->report(ERROR_LEVEL, my_errno, rgi->gtid_info(), rli->report(ERROR_LEVEL, my_errno, rgi->gtid_info(),
...@@ -8282,14 +8260,14 @@ Append_block_log_event::Append_block_log_event(const char* buf, uint len, ...@@ -8282,14 +8260,14 @@ Append_block_log_event::Append_block_log_event(const char* buf, uint len,
*/ */
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool Append_block_log_event::write(IO_CACHE* file) bool Append_block_log_event::write()
{ {
uchar buf[APPEND_BLOCK_HEADER_LEN]; uchar buf[APPEND_BLOCK_HEADER_LEN];
int4store(buf + AB_FILE_ID_OFFSET, file_id); int4store(buf + AB_FILE_ID_OFFSET, file_id);
return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) || return write_header(APPEND_BLOCK_HEADER_LEN + block_len) ||
wrapper_my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) || write_data(buf, APPEND_BLOCK_HEADER_LEN) ||
wrapper_my_b_safe_write(file, (uchar*) block, block_len) || write_data(block, block_len) ||
write_footer(file)); write_footer();
} }
#endif #endif
...@@ -8442,13 +8420,13 @@ Delete_file_log_event::Delete_file_log_event(const char* buf, uint len, ...@@ -8442,13 +8420,13 @@ Delete_file_log_event::Delete_file_log_event(const char* buf, uint len,
*/ */
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool Delete_file_log_event::write(IO_CACHE* file) bool Delete_file_log_event::write()
{ {
uchar buf[DELETE_FILE_HEADER_LEN]; uchar buf[DELETE_FILE_HEADER_LEN];
int4store(buf + DF_FILE_ID_OFFSET, file_id); int4store(buf + DF_FILE_ID_OFFSET, file_id);
return (write_header(file, sizeof(buf)) || return write_header(sizeof(buf)) ||
wrapper_my_b_safe_write(file, buf, sizeof(buf)) || write_data(buf, sizeof(buf)) ||
write_footer(file)); write_footer();
} }
#endif #endif
...@@ -8542,13 +8520,13 @@ Execute_load_log_event::Execute_load_log_event(const char* buf, uint len, ...@@ -8542,13 +8520,13 @@ Execute_load_log_event::Execute_load_log_event(const char* buf, uint len,
*/ */
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool Execute_load_log_event::write(IO_CACHE* file) bool Execute_load_log_event::write()
{ {
uchar buf[EXEC_LOAD_HEADER_LEN]; uchar buf[EXEC_LOAD_HEADER_LEN];
int4store(buf + EL_FILE_ID_OFFSET, file_id); int4store(buf + EL_FILE_ID_OFFSET, file_id);
return (write_header(file, sizeof(buf)) || return write_header(sizeof(buf)) ||
wrapper_my_b_safe_write(file, buf, sizeof(buf)) || write_data(buf, sizeof(buf)) ||
write_footer(file)); write_footer();
} }
#endif #endif
...@@ -8777,14 +8755,14 @@ ulong Execute_load_query_log_event::get_post_header_size_for_derived() ...@@ -8777,14 +8755,14 @@ ulong Execute_load_query_log_event::get_post_header_size_for_derived()
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool bool
Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file) Execute_load_query_log_event::write_post_header_for_derived()
{ {
uchar buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN]; uchar buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
int4store(buf, file_id); int4store(buf, file_id);
int4store(buf + 4, fn_pos_start); int4store(buf + 4, fn_pos_start);
int4store(buf + 4 + 4, fn_pos_end); int4store(buf + 4 + 4, fn_pos_end);
*(buf + 4 + 4 + 4)= (uchar) dup_handling; *(buf + 4 + 4 + 4)= (uchar) dup_handling;
return wrapper_my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN); return write_data(buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
} }
#endif #endif
...@@ -8927,40 +8905,6 @@ Execute_load_query_log_event::do_apply_event(rpl_group_info *rgi) ...@@ -8927,40 +8905,6 @@ Execute_load_query_log_event::do_apply_event(rpl_group_info *rgi)
sql_ex_info methods sql_ex_info methods
**************************************************************************/ **************************************************************************/
/*
sql_ex_info::write_data()
*/
bool sql_ex_info::write_data(IO_CACHE* file)
{
if (new_format())
{
return (write_str(file, field_term, (uint) field_term_len) ||
write_str(file, enclosed, (uint) enclosed_len) ||
write_str(file, line_term, (uint) line_term_len) ||
write_str(file, line_start, (uint) line_start_len) ||
write_str(file, escaped, (uint) escaped_len) ||
my_b_safe_write(file,(uchar*) &opt_flags,1));
}
else
{
/**
@todo This is sensitive to field padding. We should write a
char[7], not an old_sql_ex. /sven
*/
old_sql_ex old_ex;
old_ex.field_term= *field_term;
old_ex.enclosed= *enclosed;
old_ex.line_term= *line_term;
old_ex.line_start= *line_start;
old_ex.escaped= *escaped;
old_ex.opt_flags= opt_flags;
old_ex.empty_flags=empty_flags;
return my_b_safe_write(file, (uchar*) &old_ex, sizeof(old_ex)) != 0;
}
}
/* /*
sql_ex_info::init() sql_ex_info::init()
*/ */
...@@ -9011,12 +8955,54 @@ const char *sql_ex_info::init(const char *buf, const char *buf_end, ...@@ -9011,12 +8955,54 @@ const char *sql_ex_info::init(const char *buf, const char *buf_end,
return buf; return buf;
} }
#ifndef MYSQL_CLIENT
/*
write_str()
*/
static bool write_str(Log_event_writer *writer, const char *str, uint length)
{
uchar tmp[1];
tmp[0]= (uchar) length;
return (writer->write_data(tmp, sizeof(tmp)) ||
writer->write_data((uchar*) str, length));
}
/*
sql_ex_info::write_data()
*/
bool sql_ex_info::write_data(Log_event_writer *writer)
{
if (new_format())
{
return write_str(writer, field_term, field_term_len) ||
write_str(writer, enclosed, enclosed_len) ||
write_str(writer, line_term, line_term_len) ||
write_str(writer, line_start, line_start_len) ||
write_str(writer, escaped, escaped_len) ||
writer->write_data((uchar*) &opt_flags, 1);
}
else
{
uchar old_ex[7];
old_ex[0]= *field_term;
old_ex[1]= *enclosed;
old_ex[2]= *line_term;
old_ex[3]= *line_start;
old_ex[4]= *escaped;
old_ex[5]= opt_flags;
old_ex[6]= empty_flags;
return writer->write_data(old_ex, sizeof(old_ex));
}
}
/************************************************************************** /**************************************************************************
Rows_log_event member functions Rows_log_event member functions
**************************************************************************/ **************************************************************************/
#ifndef MYSQL_CLIENT
Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
MY_BITMAP const *cols, bool is_transactional, MY_BITMAP const *cols, bool is_transactional,
Log_event_type event_type) Log_event_type event_type)
...@@ -9965,7 +9951,7 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi) ...@@ -9965,7 +9951,7 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi)
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool Rows_log_event::write_data_header(IO_CACHE *file) bool Rows_log_event::write_data_header()
{ {
uchar buf[ROWS_HEADER_LEN_V2]; // No need to init the buffer uchar buf[ROWS_HEADER_LEN_V2]; // No need to init the buffer
DBUG_ASSERT(m_table_id != ~0UL); DBUG_ASSERT(m_table_id != ~0UL);
...@@ -9973,14 +9959,14 @@ bool Rows_log_event::write_data_header(IO_CACHE *file) ...@@ -9973,14 +9959,14 @@ bool Rows_log_event::write_data_header(IO_CACHE *file)
{ {
int4store(buf + 0, m_table_id); int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags); int2store(buf + 4, m_flags);
return (wrapper_my_b_safe_write(file, buf, 6)); return (write_data(buf, 6));
}); });
int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id); int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
int2store(buf + RW_FLAGS_OFFSET, m_flags); int2store(buf + RW_FLAGS_OFFSET, m_flags);
return (wrapper_my_b_safe_write(file, buf, ROWS_HEADER_LEN)); return write_data(buf, ROWS_HEADER_LEN);
} }
bool Rows_log_event::write_data_body(IO_CACHE*file) bool Rows_log_event::write_data_body()
{ {
/* /*
Note that this should be the number of *bits*, not the number of Note that this should be the number of *bits*, not the number of
...@@ -9993,11 +9979,10 @@ bool Rows_log_event::write_data_body(IO_CACHE*file) ...@@ -9993,11 +9979,10 @@ bool Rows_log_event::write_data_body(IO_CACHE*file)
DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf)); DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf)); DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
res= res || wrapper_my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf)); res= res || write_data(sbuf, (size_t) (sbuf_end - sbuf));
DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols)); DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols.bitmap, res= res || write_data((uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols));
no_bytes_in_map(&m_cols));
/* /*
TODO[refactor write]: Remove the "down cast" here (and elsewhere). TODO[refactor write]: Remove the "down cast" here (and elsewhere).
*/ */
...@@ -10005,11 +9990,11 @@ bool Rows_log_event::write_data_body(IO_CACHE*file) ...@@ -10005,11 +9990,11 @@ bool Rows_log_event::write_data_body(IO_CACHE*file)
{ {
DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap, DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap,
no_bytes_in_map(&m_cols_ai)); no_bytes_in_map(&m_cols_ai));
res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols_ai.bitmap, res= res || write_data((uchar*)m_cols_ai.bitmap,
no_bytes_in_map(&m_cols_ai)); no_bytes_in_map(&m_cols_ai));
} }
DBUG_DUMP("rows", m_rows_buf, data_size); DBUG_DUMP("rows", m_rows_buf, data_size);
res= res || wrapper_my_b_safe_write(file, m_rows_buf, (size_t) data_size); res= res || write_data(m_rows_buf, (size_t) data_size);
return res; return res;
...@@ -10107,16 +10092,16 @@ bool Annotate_rows_log_event::is_valid() const ...@@ -10107,16 +10092,16 @@ bool Annotate_rows_log_event::is_valid() const
} }
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool Annotate_rows_log_event::write_data_header(IO_CACHE *file) bool Annotate_rows_log_event::write_data_header()
{ {
return 0; return 0;
} }
#endif #endif
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool Annotate_rows_log_event::write_data_body(IO_CACHE *file) bool Annotate_rows_log_event::write_data_body()
{ {
return wrapper_my_b_safe_write(file, (uchar*) m_query_txt, m_query_len); return write_data(m_query_txt, m_query_len);
} }
#endif #endif
...@@ -10826,7 +10811,7 @@ int Table_map_log_event::do_update_pos(rpl_group_info *rgi) ...@@ -10826,7 +10811,7 @@ int Table_map_log_event::do_update_pos(rpl_group_info *rgi)
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool Table_map_log_event::write_data_header(IO_CACHE *file) bool Table_map_log_event::write_data_header()
{ {
DBUG_ASSERT(m_table_id != ~0UL); DBUG_ASSERT(m_table_id != ~0UL);
uchar buf[TABLE_MAP_HEADER_LEN]; uchar buf[TABLE_MAP_HEADER_LEN];
...@@ -10834,14 +10819,14 @@ bool Table_map_log_event::write_data_header(IO_CACHE *file) ...@@ -10834,14 +10819,14 @@ bool Table_map_log_event::write_data_header(IO_CACHE *file)
{ {
int4store(buf + 0, m_table_id); int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags); int2store(buf + 4, m_flags);
return (wrapper_my_b_safe_write(file, buf, 6)); return (write_data(buf, 6));
}); });
int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id); int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id);
int2store(buf + TM_FLAGS_OFFSET, m_flags); int2store(buf + TM_FLAGS_OFFSET, m_flags);
return (wrapper_my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN)); return write_data(buf, TABLE_MAP_HEADER_LEN);
} }
bool Table_map_log_event::write_data_body(IO_CACHE *file) bool Table_map_log_event::write_data_body()
{ {
DBUG_ASSERT(m_dbnam != NULL); DBUG_ASSERT(m_dbnam != NULL);
DBUG_ASSERT(m_tblnam != NULL); DBUG_ASSERT(m_tblnam != NULL);
...@@ -10862,15 +10847,15 @@ bool Table_map_log_event::write_data_body(IO_CACHE *file) ...@@ -10862,15 +10847,15 @@ bool Table_map_log_event::write_data_body(IO_CACHE *file)
uchar mbuf[MAX_INT_WIDTH]; uchar mbuf[MAX_INT_WIDTH];
uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size); uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
return (wrapper_my_b_safe_write(file, dbuf, sizeof(dbuf)) || return write_data(dbuf, sizeof(dbuf)) ||
wrapper_my_b_safe_write(file, (const uchar*)m_dbnam, m_dblen+1) || write_data(m_dbnam, m_dblen+1) ||
wrapper_my_b_safe_write(file, tbuf, sizeof(tbuf)) || write_data(tbuf, sizeof(tbuf)) ||
wrapper_my_b_safe_write(file, (const uchar*)m_tblnam, m_tbllen+1) || write_data(m_tblnam, m_tbllen+1) ||
wrapper_my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) || write_data(cbuf, (size_t) (cbuf_end - cbuf)) ||
wrapper_my_b_safe_write(file, m_coltype, m_colcnt) || write_data(m_coltype, m_colcnt) ||
wrapper_my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) || write_data(mbuf, (size_t) (mbuf_end - mbuf)) ||
wrapper_my_b_safe_write(file, m_field_metadata, m_field_metadata_size), write_data(m_field_metadata, m_field_metadata_size),
wrapper_my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8)); write_data(m_null_bits, (m_colcnt + 7) / 8);
} }
#endif #endif
...@@ -12432,35 +12417,27 @@ Incident_log_event::do_apply_event(rpl_group_info *rgi) ...@@ -12432,35 +12417,27 @@ Incident_log_event::do_apply_event(rpl_group_info *rgi)
} }
#endif #endif
#ifdef MYSQL_SERVER
bool bool
Incident_log_event::write_data_header(IO_CACHE *file) Incident_log_event::write_data_header()
{ {
DBUG_ENTER("Incident_log_event::write_data_header"); DBUG_ENTER("Incident_log_event::write_data_header");
DBUG_PRINT("enter", ("m_incident: %d", m_incident)); DBUG_PRINT("enter", ("m_incident: %d", m_incident));
uchar buf[sizeof(int16)]; uchar buf[sizeof(int16)];
int2store(buf, (int16) m_incident); int2store(buf, (int16) m_incident);
#ifndef MYSQL_CLIENT DBUG_RETURN(write_data(buf, sizeof(buf)));
DBUG_RETURN(wrapper_my_b_safe_write(file, buf, sizeof(buf)));
#else
DBUG_RETURN(my_b_safe_write(file, buf, sizeof(buf)));
#endif
} }
bool bool
Incident_log_event::write_data_body(IO_CACHE *file) Incident_log_event::write_data_body()
{ {
uchar tmp[1]; uchar tmp[1];
DBUG_ENTER("Incident_log_event::write_data_body"); DBUG_ENTER("Incident_log_event::write_data_body");
tmp[0]= (uchar) m_message.length; tmp[0]= (uchar) m_message.length;
crc= my_checksum(crc, (uchar*) tmp, 1); DBUG_RETURN(write_data(tmp, sizeof(tmp)) ||
if (m_message.length > 0) write_data(m_message.str, m_message.length));
{
crc= my_checksum(crc, (uchar*) m_message.str, m_message.length);
// todo: report a bug on write_str accepts uint but treats it as uchar
}
DBUG_RETURN(write_str(file, m_message.str, (uint) m_message.length));
} }
#endif
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
/** /**
......
...@@ -145,62 +145,8 @@ class String; ...@@ -145,62 +145,8 @@ class String;
#define LINE_START_EMPTY 0x8 #define LINE_START_EMPTY 0x8
#define ESCAPED_EMPTY 0x10 #define ESCAPED_EMPTY 0x10
/*****************************************************************************
old_sql_ex struct
****************************************************************************/
struct old_sql_ex
{
char field_term;
char enclosed;
char line_term;
char line_start;
char escaped;
char opt_flags;
char empty_flags;
};
#define NUM_LOAD_DELIM_STRS 5 #define NUM_LOAD_DELIM_STRS 5
/*****************************************************************************
sql_ex_info struct
****************************************************************************/
struct sql_ex_info
{
sql_ex_info() {} /* Remove gcc warning */
const char* field_term;
const char* enclosed;
const char* line_term;
const char* line_start;
const char* escaped;
int cached_new_format;
uint8 field_term_len,enclosed_len,line_term_len,line_start_len, escaped_len;
char opt_flags;
char empty_flags;
// store in new format even if old is possible
void force_new_format() { cached_new_format = 1;}
int data_size()
{
return (new_format() ?
field_term_len + enclosed_len + line_term_len +
line_start_len + escaped_len + 6 : 7);
}
bool write_data(IO_CACHE* file);
const char* init(const char* buf, const char* buf_end, bool use_new_format);
bool new_format()
{
return ((cached_new_format != -1) ? cached_new_format :
(cached_new_format=(field_term_len > 1 ||
enclosed_len > 1 ||
line_term_len > 1 || line_start_len > 1 ||
escaped_len > 1)));
}
};
/***************************************************************************** /*****************************************************************************
MySQL Binary Log MySQL Binary Log
...@@ -842,6 +788,33 @@ typedef struct st_print_event_info ...@@ -842,6 +788,33 @@ typedef struct st_print_event_info
} PRINT_EVENT_INFO; } PRINT_EVENT_INFO;
#endif #endif
/**
This class encapsulates writing of Log_event objects to IO_CACHE.
Automatically calculates the checksum if necessary.
*/
class Log_event_writer
{
public:
ulonglong bytes_written;
uint checksum_len;
int write(Log_event *ev);
int write_header(uchar *pos, size_t len);
int write_data(const uchar *pos, size_t len);
int write_footer();
my_off_t pos() { return my_b_safe_tell(file); }
Log_event_writer(IO_CACHE *file_arg) : bytes_written(0), file(file_arg) { }
private:
IO_CACHE *file;
/**
Placeholder for event checksum while writing to binlog.
*/
ha_checksum crc;
int write_internal(const uchar *pos, size_t len);
};
/** /**
the struct aggregates two paramenters that identify an event the struct aggregates two paramenters that identify an event
uniquely in scope of communication of a particular master and slave couple. uniquely in scope of communication of a particular master and slave couple.
...@@ -1108,10 +1081,7 @@ public: ...@@ -1108,10 +1081,7 @@ public:
*/ */
ulong slave_exec_mode; ulong slave_exec_mode;
/** Log_event_writer *writer;
Placeholder for event checksum while writing to binlog.
*/
ha_checksum crc;
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
THD* thd; THD* thd;
...@@ -1143,6 +1113,7 @@ public: ...@@ -1143,6 +1113,7 @@ public:
} }
#else #else
Log_event() : temp_buf(0), flags(0) {} Log_event() : temp_buf(0), flags(0) {}
ha_checksum crc;
/* print*() functions are used by mysqlbinlog */ /* print*() functions are used by mysqlbinlog */
virtual void print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0; virtual void print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0;
void print_timestamp(IO_CACHE* file, time_t *ts = 0); void print_timestamp(IO_CACHE* file, time_t *ts = 0);
...@@ -1216,23 +1187,26 @@ public: ...@@ -1216,23 +1187,26 @@ public:
/* Placement version of the above operators */ /* Placement version of the above operators */
static void *operator new(size_t, void* ptr) { return ptr; } static void *operator new(size_t, void* ptr) { return ptr; }
static void operator delete(void*, void*) { } static void operator delete(void*, void*) { }
bool wrapper_my_b_safe_write(IO_CACHE* file, const uchar* buf, ulong data_length);
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write_header(IO_CACHE* file, ulong data_length); bool write_header(ulong data_length);
bool write_footer(IO_CACHE* file); bool write_data(const uchar *buf, ulong data_length)
{ return writer->write_data(buf, data_length); }
bool write_data(const char *buf, ulong data_length)
{ return write_data((uchar*)buf, data_length); }
bool write_footer()
{ return writer->write_footer(); }
my_bool need_checksum(); my_bool need_checksum();
virtual bool write(IO_CACHE* file) virtual bool write()
{ {
return(write_header(file, get_data_size()) || return write_header(get_data_size()) || write_data_header() ||
write_data_header(file) || write_data_body() || write_footer();
write_data_body(file) ||
write_footer(file));
} }
virtual bool write_data_header(IO_CACHE* file __attribute__((unused))) virtual bool write_data_header()
{ return 0; } { return 0; }
virtual bool write_data_body(IO_CACHE* file __attribute__((unused))) virtual bool write_data_body()
{ return 0; } { return 0; }
/* Return start of query time or current time */ /* Return start of query time or current time */
...@@ -1989,8 +1963,8 @@ public: ...@@ -1989,8 +1963,8 @@ public:
static int dummy_event(String *packet, ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg); static int dummy_event(String *packet, ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg);
static int begin_event(String *packet, ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg); static int begin_event(String *packet, ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg);
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write(IO_CACHE* file); bool write();
virtual bool write_post_header_for_derived(IO_CACHE* file) { return FALSE; } virtual bool write_post_header_for_derived() { return FALSE; }
#endif #endif
bool is_valid() const { return query != 0; } bool is_valid() const { return query != 0; }
...@@ -2040,6 +2014,42 @@ public: /* !!! Public in this patch to allow old usage */ ...@@ -2040,6 +2014,42 @@ public: /* !!! Public in this patch to allow old usage */
}; };
/*****************************************************************************
sql_ex_info struct
****************************************************************************/
struct sql_ex_info
{
sql_ex_info() {} /* Remove gcc warning */
const char* field_term;
const char* enclosed;
const char* line_term;
const char* line_start;
const char* escaped;
int cached_new_format;
uint8 field_term_len,enclosed_len,line_term_len,line_start_len, escaped_len;
char opt_flags;
char empty_flags;
// store in new format even if old is possible
void force_new_format() { cached_new_format = 1;}
int data_size()
{
return (new_format() ?
field_term_len + enclosed_len + line_term_len +
line_start_len + escaped_len + 6 : 7);
}
bool write_data(Log_event_writer *writer);
const char* init(const char* buf, const char* buf_end, bool use_new_format);
bool new_format()
{
return ((cached_new_format != -1) ? cached_new_format :
(cached_new_format=(field_term_len > 1 ||
enclosed_len > 1 ||
line_term_len > 1 || line_start_len > 1 ||
escaped_len > 1)));
}
};
/** /**
@class Load_log_event @class Load_log_event
...@@ -2333,8 +2343,8 @@ public: ...@@ -2333,8 +2343,8 @@ public:
return sql_ex.new_format() ? NEW_LOAD_EVENT: LOAD_EVENT; return sql_ex.new_format() ? NEW_LOAD_EVENT: LOAD_EVENT;
} }
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write_data_header(IO_CACHE* file); bool write_data_header();
bool write_data_body(IO_CACHE* file); bool write_data_body();
#endif #endif
bool is_valid() const { return table_name != 0; } bool is_valid() const { return table_name != 0; }
int get_data_size() int get_data_size()
...@@ -2422,7 +2432,7 @@ public: ...@@ -2422,7 +2432,7 @@ public:
my_off_t get_header_len(my_off_t l __attribute__((unused))) my_off_t get_header_len(my_off_t l __attribute__((unused)))
{ return LOG_EVENT_MINIMAL_HEADER_LEN; } { return LOG_EVENT_MINIMAL_HEADER_LEN; }
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write(IO_CACHE* file); bool write();
#endif #endif
bool is_valid() const { return server_version[0] != 0; } bool is_valid() const { return server_version[0] != 0; }
int get_data_size() int get_data_size()
...@@ -2492,7 +2502,7 @@ public: ...@@ -2492,7 +2502,7 @@ public:
} }
Log_event_type get_type_code() { return FORMAT_DESCRIPTION_EVENT;} Log_event_type get_type_code() { return FORMAT_DESCRIPTION_EVENT;}
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write(IO_CACHE* file); bool write();
#endif #endif
bool header_is_valid() const bool header_is_valid() const
{ {
...@@ -2601,7 +2611,7 @@ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg, ...@@ -2601,7 +2611,7 @@ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg,
const char* get_var_type_name(); const char* get_var_type_name();
int get_data_size() { return 9; /* sizeof(type) + sizeof(val) */;} int get_data_size() { return 9; /* sizeof(type) + sizeof(val) */;}
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write(IO_CACHE* file); bool write();
#endif #endif
bool is_valid() const { return 1; } bool is_valid() const { return 1; }
bool is_part_of_group() { return 1; } bool is_part_of_group() { return 1; }
...@@ -2681,7 +2691,7 @@ class Rand_log_event: public Log_event ...@@ -2681,7 +2691,7 @@ class Rand_log_event: public Log_event
Log_event_type get_type_code() { return RAND_EVENT;} Log_event_type get_type_code() { return RAND_EVENT;}
int get_data_size() { return 16; /* sizeof(ulonglong) * 2*/ } int get_data_size() { return 16; /* sizeof(ulonglong) * 2*/ }
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write(IO_CACHE* file); bool write();
#endif #endif
bool is_valid() const { return 1; } bool is_valid() const { return 1; }
bool is_part_of_group() { return 1; } bool is_part_of_group() { return 1; }
...@@ -2731,7 +2741,7 @@ class Xid_log_event: public Log_event ...@@ -2731,7 +2741,7 @@ class Xid_log_event: public Log_event
Log_event_type get_type_code() { return XID_EVENT;} Log_event_type get_type_code() { return XID_EVENT;}
int get_data_size() { return sizeof(xid); } int get_data_size() { return sizeof(xid); }
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write(IO_CACHE* file); bool write();
#endif #endif
bool is_valid() const { return 1; } bool is_valid() const { return 1; }
...@@ -2792,7 +2802,7 @@ public: ...@@ -2792,7 +2802,7 @@ public:
~User_var_log_event() {} ~User_var_log_event() {}
Log_event_type get_type_code() { return USER_VAR_EVENT;} Log_event_type get_type_code() { return USER_VAR_EVENT;}
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write(IO_CACHE* file); bool write();
/* /*
Getter and setter for deferred User-event. Getter and setter for deferred User-event.
Returns true if the event is not applied directly Returns true if the event is not applied directly
...@@ -2944,7 +2954,7 @@ public: ...@@ -2944,7 +2954,7 @@ public:
int get_data_size() { return ident_len + ROTATE_HEADER_LEN;} int get_data_size() { return ident_len + ROTATE_HEADER_LEN;}
bool is_valid() const { return new_log_ident != 0; } bool is_valid() const { return new_log_ident != 0; }
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write(IO_CACHE* file); bool write();
#endif #endif
private: private:
...@@ -2977,7 +2987,7 @@ public: ...@@ -2977,7 +2987,7 @@ public:
int get_data_size() { return binlog_file_len + BINLOG_CHECKPOINT_HEADER_LEN;} int get_data_size() { return binlog_file_len + BINLOG_CHECKPOINT_HEADER_LEN;}
bool is_valid() const { return binlog_file_name != 0; } bool is_valid() const { return binlog_file_name != 0; }
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write(IO_CACHE* file); bool write();
enum_skip_reason do_shall_skip(rpl_group_info *rgi); enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif #endif
}; };
...@@ -3105,7 +3115,7 @@ public: ...@@ -3105,7 +3115,7 @@ public:
} }
bool is_valid() const { return seq_no != 0; } bool is_valid() const { return seq_no != 0; }
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write(IO_CACHE *file); bool write();
static int make_compatible_event(String *packet, bool *need_dummy_event, static int make_compatible_event(String *packet, bool *need_dummy_event,
ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg); ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg);
static bool peek(const char *event_start, size_t event_len, static bool peek(const char *event_start, size_t event_len,
...@@ -3220,7 +3230,7 @@ public: ...@@ -3220,7 +3230,7 @@ public:
bool is_valid() const { return list != NULL; } bool is_valid() const { return list != NULL; }
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
bool to_packet(String *packet); bool to_packet(String *packet);
bool write(IO_CACHE *file); bool write();
virtual int do_apply_event(rpl_group_info *rgi); virtual int do_apply_event(rpl_group_info *rgi);
enum_skip_reason do_shall_skip(rpl_group_info *rgi); enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif #endif
...@@ -3291,13 +3301,13 @@ public: ...@@ -3291,13 +3301,13 @@ public:
} }
bool is_valid() const { return inited_from_old || block != 0; } bool is_valid() const { return inited_from_old || block != 0; }
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write_data_header(IO_CACHE* file); bool write_data_header();
bool write_data_body(IO_CACHE* file); bool write_data_body();
/* /*
Cut out Create_file extentions and Cut out Create_file extentions and
write it as Load event - used on the slave write it as Load event - used on the slave
*/ */
bool write_base(IO_CACHE* file); bool write_base();
#endif #endif
private: private:
...@@ -3351,7 +3361,7 @@ public: ...@@ -3351,7 +3361,7 @@ public:
int get_data_size() { return block_len + APPEND_BLOCK_HEADER_LEN ;} int get_data_size() { return block_len + APPEND_BLOCK_HEADER_LEN ;}
bool is_valid() const { return block != 0; } bool is_valid() const { return block != 0; }
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write(IO_CACHE* file); bool write();
const char* get_db() { return db; } const char* get_db() { return db; }
#endif #endif
...@@ -3392,7 +3402,7 @@ public: ...@@ -3392,7 +3402,7 @@ public:
int get_data_size() { return DELETE_FILE_HEADER_LEN ;} int get_data_size() { return DELETE_FILE_HEADER_LEN ;}
bool is_valid() const { return file_id != 0; } bool is_valid() const { return file_id != 0; }
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write(IO_CACHE* file); bool write();
const char* get_db() { return db; } const char* get_db() { return db; }
#endif #endif
...@@ -3432,7 +3442,7 @@ public: ...@@ -3432,7 +3442,7 @@ public:
int get_data_size() { return EXEC_LOAD_HEADER_LEN ;} int get_data_size() { return EXEC_LOAD_HEADER_LEN ;}
bool is_valid() const { return file_id != 0; } bool is_valid() const { return file_id != 0; }
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write(IO_CACHE* file); bool write();
const char* get_db() { return db; } const char* get_db() { return db; }
#endif #endif
...@@ -3532,7 +3542,7 @@ public: ...@@ -3532,7 +3542,7 @@ public:
ulong get_post_header_size_for_derived(); ulong get_post_header_size_for_derived();
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write_post_header_for_derived(IO_CACHE* file); bool write_post_header_for_derived();
#endif #endif
private: private:
...@@ -3596,8 +3606,8 @@ public: ...@@ -3596,8 +3606,8 @@ public:
virtual bool is_part_of_group() { return 1; } virtual bool is_part_of_group() { return 1; }
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
virtual bool write_data_header(IO_CACHE*); virtual bool write_data_header();
virtual bool write_data_body(IO_CACHE*); virtual bool write_data_body();
#endif #endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
...@@ -4012,8 +4022,8 @@ public: ...@@ -4012,8 +4022,8 @@ public:
virtual int get_data_size() { return (uint) m_data_size; } virtual int get_data_size() { return (uint) m_data_size; }
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
virtual int save_field_metadata(); virtual int save_field_metadata();
virtual bool write_data_header(IO_CACHE *file); virtual bool write_data_header();
virtual bool write_data_body(IO_CACHE *file); virtual bool write_data_body();
virtual const char *get_db() { return m_dbnam; } virtual const char *get_db() { return m_dbnam; }
#endif #endif
...@@ -4211,8 +4221,8 @@ public: ...@@ -4211,8 +4221,8 @@ public:
#endif #endif
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
virtual bool write_data_header(IO_CACHE *file); virtual bool write_data_header();
virtual bool write_data_body(IO_CACHE *file); virtual bool write_data_body();
virtual const char *get_db() { return m_table->s->db.str; } virtual const char *get_db() { return m_table->s->db.str; }
#endif #endif
/* /*
...@@ -4674,6 +4684,9 @@ public: ...@@ -4674,6 +4684,9 @@ public:
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
void pack_info(THD *thd, Protocol*); void pack_info(THD *thd, Protocol*);
virtual bool write_data_header();
virtual bool write_data_body();
#endif #endif
Incident_log_event(const char *buf, uint event_len, Incident_log_event(const char *buf, uint event_len,
...@@ -4689,9 +4702,6 @@ public: ...@@ -4689,9 +4702,6 @@ public:
virtual int do_apply_event(rpl_group_info *rgi); virtual int do_apply_event(rpl_group_info *rgi);
#endif #endif
virtual bool write_data_header(IO_CACHE *file);
virtual bool write_data_body(IO_CACHE *file);
virtual Log_event_type get_type_code() { return INCIDENT_EVENT; } virtual Log_event_type get_type_code() { return INCIDENT_EVENT; }
virtual bool is_valid() const virtual bool is_valid() const
...@@ -4752,6 +4762,14 @@ private: ...@@ -4752,6 +4762,14 @@ private:
uint ident_len; uint ident_len;
}; };
inline int Log_event_writer::write(Log_event *ev)
{
ev->writer= this;
int res= ev->write();
IF_DBUG(ev->writer= 0,); // writer must be set before every Log_event::write
return res;
}
/** /**
The function is called by slave applier in case there are The function is called by slave applier in case there are
active table filtering rules to force gathering events associated active table filtering rules to force gathering events associated
......
...@@ -1753,7 +1753,7 @@ Old_rows_log_event::do_update_pos(rpl_group_info *rgi) ...@@ -1753,7 +1753,7 @@ Old_rows_log_event::do_update_pos(rpl_group_info *rgi)
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
bool Old_rows_log_event::write_data_header(IO_CACHE *file) bool Old_rows_log_event::write_data_header()
{ {
uchar buf[ROWS_HEADER_LEN]; // No need to init the buffer uchar buf[ROWS_HEADER_LEN]; // No need to init the buffer
...@@ -1765,15 +1765,15 @@ bool Old_rows_log_event::write_data_header(IO_CACHE *file) ...@@ -1765,15 +1765,15 @@ bool Old_rows_log_event::write_data_header(IO_CACHE *file)
{ {
int4store(buf + 0, m_table_id); int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags); int2store(buf + 4, m_flags);
return (my_b_safe_write(file, buf, 6)); return write_data(buf, 6);
}); });
int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id); int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
int2store(buf + RW_FLAGS_OFFSET, m_flags); int2store(buf + RW_FLAGS_OFFSET, m_flags);
return (my_b_safe_write(file, buf, ROWS_HEADER_LEN)); return write_data(buf, ROWS_HEADER_LEN);
} }
bool Old_rows_log_event::write_data_body(IO_CACHE*file) bool Old_rows_log_event::write_data_body()
{ {
/* /*
Note that this should be the number of *bits*, not the number of Note that this should be the number of *bits*, not the number of
...@@ -1790,13 +1790,12 @@ bool Old_rows_log_event::write_data_body(IO_CACHE*file) ...@@ -1790,13 +1790,12 @@ bool Old_rows_log_event::write_data_body(IO_CACHE*file)
DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf)); DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf)); DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf)); res= res || write_data(sbuf, (size_t) (sbuf_end - sbuf));
DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols)); DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap, res= res || write_data((uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols));
no_bytes_in_map(&m_cols));
DBUG_DUMP("rows", m_rows_buf, data_size); DBUG_DUMP("rows", m_rows_buf, data_size);
res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size); res= res || write_data(m_rows_buf, (size_t) data_size);
return res; return res;
......
...@@ -134,8 +134,8 @@ public: ...@@ -134,8 +134,8 @@ public:
ulong get_table_id() const { return m_table_id; } ulong get_table_id() const { return m_table_id; }
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
virtual bool write_data_header(IO_CACHE *file); virtual bool write_data_header();
virtual bool write_data_body(IO_CACHE *file); virtual bool write_data_body();
virtual const char *get_db() { return m_table->s->db.str; } virtual const char *get_db() { return m_table->s->db.str; }
#endif #endif
/* /*
......
...@@ -5501,7 +5501,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -5501,7 +5501,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mi->master_log_name, rev.new_log_ident); mi->master_log_name, rev.new_log_ident);
mysql_mutex_lock(log_lock); mysql_mutex_lock(log_lock);
if (likely(!fdle.write(rli->relay_log.get_log_file()) && if (likely(!rli->relay_log.write_event(&fdle) &&
!rli->relay_log.flush_and_sync(NULL))) !rli->relay_log.flush_and_sync(NULL)))
{ {
rli->relay_log.harvest_bytes_written(&rli->log_space_total); rli->relay_log.harvest_bytes_written(&rli->log_space_total);
......
...@@ -445,6 +445,7 @@ void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf, ...@@ -445,6 +445,7 @@ void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
char filename[PATH_MAX]= {0}; char filename[PATH_MAX]= {0};
File file; File file;
IO_CACHE cache; IO_CACHE cache;
Log_event_writer writer(&cache);
Format_description_log_event *ev= wsrep_get_apply_format(thd); Format_description_log_event *ev= wsrep_get_apply_format(thd);
int len= my_snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld_v2.log", int len= my_snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld_v2.log",
...@@ -476,7 +477,7 @@ void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf, ...@@ -476,7 +477,7 @@ void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
goto cleanup2; goto cleanup2;
} }
if (ev->write(&cache) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) || if (writer.write(ev) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) ||
flush_io_cache(&cache)) flush_io_cache(&cache))
{ {
WSREP_ERROR("Failed to write to '%s'.", filename); WSREP_ERROR("Failed to write to '%s'.", filename);
......
...@@ -1215,6 +1215,7 @@ int wsrep_to_buf_helper( ...@@ -1215,6 +1215,7 @@ int wsrep_to_buf_helper(
THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len) THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len)
{ {
IO_CACHE tmp_io_cache; IO_CACHE tmp_io_cache;
Log_event_writer writer(&tmp_io_cache);
if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
65536, MYF(MY_WME))) 65536, MYF(MY_WME)))
return 1; return 1;
...@@ -1222,7 +1223,7 @@ int wsrep_to_buf_helper( ...@@ -1222,7 +1223,7 @@ int wsrep_to_buf_helper(
Format_description_log_event *tmp_fd= new Format_description_log_event(4); Format_description_log_event *tmp_fd= new Format_description_log_event(4);
tmp_fd->checksum_alg= (enum_binlog_checksum_alg)binlog_checksum_options; tmp_fd->checksum_alg= (enum_binlog_checksum_alg)binlog_checksum_options;
tmp_fd->write(&tmp_io_cache); writer.write(tmp_fd);
delete tmp_fd; delete tmp_fd;
#ifdef GTID_SUPPORT #ifdef GTID_SUPPORT
...@@ -1230,7 +1231,7 @@ int wsrep_to_buf_helper( ...@@ -1230,7 +1231,7 @@ int wsrep_to_buf_helper(
{ {
Gtid_log_event gtid_ev(thd, FALSE, &thd->variables.gtid_next); Gtid_log_event gtid_ev(thd, FALSE, &thd->variables.gtid_next);
if (!gtid_ev.is_valid()) ret= 0; if (!gtid_ev.is_valid()) ret= 0;
if (!ret && gtid_ev.write(&tmp_io_cache)) ret= 1; if (!ret && writer.write(&gtid_ev)) ret= 1;
} }
#endif /* GTID_SUPPORT */ #endif /* GTID_SUPPORT */
...@@ -1240,12 +1241,12 @@ int wsrep_to_buf_helper( ...@@ -1240,12 +1241,12 @@ int wsrep_to_buf_helper(
Query_log_event ev(thd, thd->wsrep_TOI_pre_query, Query_log_event ev(thd, thd->wsrep_TOI_pre_query,
thd->wsrep_TOI_pre_query_len, thd->wsrep_TOI_pre_query_len,
FALSE, FALSE, FALSE, 0); FALSE, FALSE, FALSE, 0);
if (ev.write(&tmp_io_cache)) ret= 1; if (writer.write(&ev)) ret= 1;
} }
/* continue to append the actual query */ /* continue to append the actual query */
Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0); Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0);
if (!ret && ev.write(&tmp_io_cache)) ret= 1; if (!ret && writer.write(&ev)) ret= 1;
if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1; if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1;
close_cached_file(&tmp_io_cache); close_cached_file(&tmp_io_cache);
return ret; return ret;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment