Commit ad86b3e4 authored by pekka@mysql.com's avatar pekka@mysql.com

ndb - wl#3023 : pass tables per GCI to injector at epoch start

parent d8957fbf
......@@ -2829,10 +2829,41 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
assert(pOp->getGCI() <= ndb_latest_received_binlog_epoch);
bzero((char*) &row, sizeof(row));
injector::transaction trans= inj->new_trans(thd);
{ // pass table map before epoch
Uint32 iter=0;
const NdbEventOperation* gci_op;
Uint32 event_types;
while ((gci_op=ndb->getGCIEventOperations(&iter, &event_types))
!= NULL)
{
NDB_SHARE* share=(NDB_SHARE*)gci_op->getCustomData();
DBUG_PRINT("info", ("per gci op %p share %p event types 0x%x",
gci_op, share, event_types));
// this should not happen
if (share == NULL || share->table == NULL)
{
DBUG_PRINT("info", ("no share or table !"));
continue;
}
TABLE* table=share->table;
const LEX_STRING& name=table->s->table_name;
DBUG_PRINT("info", ("use_table: %.*s", name.length, name.str));
injector::transaction::table tbl(table, true);
// TODO enable when mats patch pushed
//trans.use_table(::server_id, tbl);
}
}
gci= pOp->getGCI();
if (apply_status_share)
{
TABLE *table= apply_status_share->table;
const LEX_STRING& name=table->s->table_name;
DBUG_PRINT("info", ("use_table: %.*s", name.length, name.str));
injector::transaction::table tbl(table, true);
// TODO enable when mats patch pushed
//trans.use_table(::server_id, tbl);
MY_BITMAP b;
uint32 bitbuf;
DBUG_ASSERT(table->s->fields <= sizeof(bitbuf) * 8);
......
......@@ -1240,6 +1240,18 @@ public:
*/
NdbEventOperation *nextEvent();
/**
* Iterate over distinct event operations which are part of current
* GCI. Valid after nextEvent. Used to get summary information for
* the epoch (e.g. list of all tables) before processing event data.
*
* Set *iter=0 to start. Returns NULL when no more. If event_types
* is not NULL, it returns bitmask of received event types.
*/
const NdbEventOperation*
getGCIEventOperations(Uint32* iter, Uint32* event_types);
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
NdbEventOperation *getEventOperation(NdbEventOperation* eventOp= 0);
Uint64 getLatestGCI();
......
......@@ -1293,6 +1293,16 @@ NdbEventOperation *Ndb::nextEvent()
return theEventBuffer->nextEvent();
}
const NdbEventOperation*
Ndb::getGCIEventOperations(Uint32* iter, Uint32* event_types)
{
NdbEventOperationImpl* op =
theEventBuffer->getGCIEventOperations(iter, event_types);
if (op != NULL)
return op->m_facade;
return NULL;
}
Uint64 Ndb::getLatestGCI()
{
return theEventBuffer->getLatestGCI();
......
......@@ -1081,6 +1081,19 @@ NdbEventBuffer::nextEvent()
DBUG_RETURN_EVENT(0);
}
NdbEventOperationImpl*
NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types)
{
if (*iter < m_available_data.m_gci_op_count)
{
EventBufData_list::Gci_op g = m_available_data.m_gci_op_list[(*iter)++];
if (event_types != NULL)
*event_types = g.event_types;
return g.op;
}
return NULL;
}
void
NdbEventBuffer::lock()
{
......@@ -2061,7 +2074,36 @@ NdbEventBuffer::free_list(EventBufData_list &list)
}
// list returned to m_free_data
new (&list) EventBufData_list;
list.m_head = list.m_tail = NULL;
list.m_count = list.m_sz = 0;
}
void
EventBufData_list::add_gci_op(Gci_op g)
{
assert(g.op != NULL);
Uint32 i;
for (i = 0; i < m_gci_op_count; i++) {
if (m_gci_op_list[i].op == g.op)
break;
}
if (i < m_gci_op_count) {
m_gci_op_list[i].event_types |= g.event_types;
} else {
if (m_gci_op_count == m_gci_op_alloc) {
Uint32 n = 1 + 2 * m_gci_op_alloc;
Gci_op* old_list = m_gci_op_list;
m_gci_op_list = new Gci_op [n];
if (m_gci_op_alloc != 0) {
Uint32 bytes = m_gci_op_alloc * sizeof(Gci_op);
memcpy(m_gci_op_list, old_list, bytes);
delete [] old_list;
}
m_gci_op_alloc = n;
}
assert(m_gci_op_count < m_gci_op_alloc);
m_gci_op_list[m_gci_op_count++] = g;
}
}
NdbEventOperation*
......
......@@ -76,19 +76,31 @@ public:
EventBufData *m_head, *m_tail;
unsigned m_count;
unsigned m_sz;
// distinct ops per gci (assume no hash needed)
struct Gci_op { NdbEventOperationImpl* op; Uint32 event_types; };
Gci_op* m_gci_op_list;
Uint32 m_gci_op_count;
Uint32 m_gci_op_alloc;
private:
void add_gci_op(Gci_op g);
};
inline
EventBufData_list::EventBufData_list()
: m_head(0), m_tail(0),
m_count(0),
m_sz(0)
m_sz(0),
m_gci_op_list(NULL),
m_gci_op_count(0),
m_gci_op_alloc(0)
{
}
inline
EventBufData_list::~EventBufData_list()
{
delete [] m_gci_op_list;
}
inline
......@@ -110,6 +122,9 @@ void EventBufData_list::remove_first()
inline
void EventBufData_list::append(EventBufData *data)
{
Gci_op g = { data->m_event_op, 1 << (Uint32)data->sdata->operation };
add_gci_op(g);
data->m_next= 0;
if (m_tail)
m_tail->m_next= data;
......@@ -130,6 +145,10 @@ void EventBufData_list::append(EventBufData *data)
inline
void EventBufData_list::append(const EventBufData_list &list)
{
Uint32 i;
for (i = 0; i < list.m_gci_op_count; i++)
add_gci_op(list.m_gci_op_list[i]);
if (m_tail)
m_tail->m_next= list.m_head;
else
......@@ -265,7 +284,6 @@ private:
void receive_data(NdbRecAttr *r, const Uint32 *data, Uint32 sz);
};
class NdbEventBuffer {
public:
NdbEventBuffer(Ndb*);
......@@ -303,6 +321,8 @@ public:
int pollEvents(int aMillisecondNumber, Uint64 *latestGCI= 0);
NdbEventOperation *nextEvent();
NdbEventOperationImpl* getGCIEventOperations(Uint32* iter,
Uint32* event_types);
NdbEventOperationImpl *move_data();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment