Commit 8a93a7c0 authored by Teemu Ollakka's avatar Teemu Ollakka Committed by Nirbhay Choubey

refs codership/mysql-wsrep#226 Limit binlog recovery to found wsrep position

Limit binlog recovery so that the wsrep position found from
storage engines is not exceeded. This is required to have consistent
position between wsrep position stored in innodb header and
recoverd binlog.
parent 652e4c1d
......@@ -7701,6 +7701,22 @@ int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
HASH xids;
MEM_ROOT mem_root;
#ifdef WITH_WSREP
/*
Read current wsrep position from storage engines to have consistent
end position for binlog scan.
*/
wsrep_uuid_t uuid;
wsrep_seqno_t seqno;
wsrep_get_SE_checkpoint(uuid, seqno);
char uuid_str[40];
wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str));
WSREP_INFO("Binlog recovery, found wsrep position %s:%lld", uuid_str,
(long long)seqno);
const wsrep_seqno_t last_xid_seqno= seqno;
wsrep_seqno_t cur_xid_seqno= WSREP_SEQNO_UNDEFINED;
#endif /* WITH_WSREP */
if (! fdle->is_valid() ||
my_hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0,
sizeof(my_xid), 0, 0, MYF(0)))
......@@ -7712,7 +7728,12 @@ int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
while ((ev= Log_event::read_log_event(log, 0, fdle,
opt_master_verify_checksum))
&& ev->is_valid())
&& ev->is_valid()
#ifdef WITH_WSREP
&& (last_xid_seqno == WSREP_SEQNO_UNDEFINED ||
last_xid_seqno != cur_xid_seqno)
#endif
)
{
if (ev->get_type_code() == XID_EVENT)
{
......@@ -7721,10 +7742,18 @@ int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
sizeof(xev->xid));
if (!x || my_hash_insert(&xids, x))
goto err2;
#ifdef WITH_WSREP
cur_xid_seqno= xev->xid;
#endif /* WITH_WSREP */
}
delete ev;
}
#ifdef WITH_WSREP
WSREP_INFO("Binlog recovery scan stopped at Xid event %lld",
(long long)cur_xid_seqno);
#endif /* WITH_WSREP */
if (ha_recover(&xids))
goto err2;
......
......@@ -207,6 +207,30 @@ void wsrep_get_SE_checkpoint(XID* xid)
plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid);
}
void wsrep_get_SE_checkpoint(wsrep_uuid_t& uuid, wsrep_seqno_t& seqno)
{
uuid= WSREP_UUID_UNDEFINED;
seqno= WSREP_SEQNO_UNDEFINED;
XID xid;
memset(&xid, 0, sizeof(xid));
xid.formatID= -1;
wsrep_get_SE_checkpoint(&xid);
if (xid.formatID == -1) return; // nil XID
if (!wsrep_is_wsrep_xid(&xid))
{
WSREP_WARN("Read non-wsrep XID from storage engines.");
return;
}
uuid= *wsrep_xid_uuid(&xid);
seqno= wsrep_xid_seqno(&xid);
}
static wsrep_cb_status_t
wsrep_view_handler_cb (void* app_ctx,
void* recv_ctx,
......
......@@ -316,7 +316,7 @@ int wsrep_alter_event_query(THD *thd, uchar** buf, size_t* buf_len);
const wsrep_uuid_t* wsrep_cluster_uuid();
struct xid_t;
void wsrep_set_SE_checkpoint(xid_t*);
void wsrep_get_SE_checkpoint(wsrep_uuid_t&, wsrep_seqno_t&);
void wsrep_xid_init(xid_t*, const wsrep_uuid_t*, wsrep_seqno_t);
const wsrep_uuid_t* wsrep_xid_uuid(const xid_t*);
wsrep_seqno_t wsrep_xid_seqno(const xid_t*);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment