added assert for when ndb share is not released as it should

+ corrected the bugs in this resuling from mysql-test-run
+removed some debug printouts
parent f569266b
......@@ -5307,7 +5307,7 @@ int ndbcluster_find_all_files(THD *thd)
pthread_mutex_lock(&ndbcluster_mutex);
if (((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables,
(byte*) key, strlen(key)))
&& share->op == 0 && share->op_old == 0)
&& share->op == 0 && share->op_old == 0 && ! (share->flags & NSF_NO_BINLOG))
|| share == 0)
{
/*
......@@ -5451,7 +5451,7 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
end= strxnmov(end1, sizeof(name) - (end1 - name), file_name, NullS);
if ((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables,
(byte*)name, end - name))
&& share->op == 0 && share->op_old == 0)
&& share->op == 0 && share->op_old == 0 && ! (share->flags & NSF_NO_BINLOG))
{
/*
there is no binlog creation setup for this table
......@@ -5464,6 +5464,8 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
pthread_mutex_unlock(&LOCK_open);
pthread_mutex_lock(&ndbcluster_mutex);
}
/* Table existed in the mysqld so there should be a share */
DBUG_ASSERT(share != NULL);
}
pthread_mutex_unlock(&ndbcluster_mutex);
}
......@@ -6276,6 +6278,11 @@ int handle_trailing_share(NDB_SHARE *share)
share->key, share->use_count);
dbug_print_open_tables();
/*
Ndb share has not been released as it should
*/
DBUG_ASSERT(FALSE);
/*
This is probably an error. We can however save the situation
at the cost of a possible mem leak, by "renaming" the share
......
......@@ -113,6 +113,7 @@ typedef struct st_ndbcluster_share {
#ifdef HAVE_NDB_BINLOG
/* NDB_SHARE.flags */
#define NSF_HIDDEN_PK 1 /* table has hidden primary key */
#define NSF_NO_BINLOG 4 /* table should not be binlogged */
#endif
typedef enum ndb_item_type {
......
......@@ -1714,7 +1714,16 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
{
DBUG_ENTER("ndbcluster_create_event");
if (!share)
{
DBUG_PRINT("info", ("share == NULL"));
DBUG_RETURN(0);
}
if (share->flags & NSF_NO_BINLOG)
{
DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d", share->flags, share->flags & NSF_NO_BINLOG));
DBUG_RETURN(0);
}
NDBDICT *dict= ndb->getDictionary();
NDBEVENT my_event(event_name);
my_event.setTable(*ndbtab);
......@@ -1831,6 +1840,12 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
DBUG_ASSERT(share != 0);
if (share->flags & NSF_NO_BINLOG)
{
DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x", share->flags));
DBUG_RETURN(0);
}
if (share->op)
{
assert(share->op->getCustomData() == (void *) share);
......@@ -1854,6 +1869,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
{
sql_print_error("NDB Binlog: logging of blob table %s "
"is not supported", share->key);
share->flags|= NSF_NO_BINLOG;
DBUG_RETURN(0);
}
}
......
......@@ -55,6 +55,19 @@ static const Uint32 ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1;
*/
//#define EVENT_DEBUG
#ifdef EVENT_DEBUG
#define DBUG_ENTER_EVENT(A) DBUG_ENTER(A)
#define DBUG_RETURN_EVENT(A) DBUG_RETURN(A)
#define DBUG_VOID_RETURN_EVENT DBUG_VOID_RETURN
#define DBUG_PRINT_EVENT(A,B) DBUG_PRINT(A,B)
#define DBUG_DUMP_EVENT(A,B,C) DBUG_SUMP(A,B,C)
#else
#define DBUG_ENTER_EVENT(A)
#define DBUG_RETURN_EVENT(A) return(A)
#define DBUG_VOID_RETURN_EVENT return
#define DBUG_PRINT_EVENT(A,B)
#define DBUG_DUMP_EVENT(A,B,C)
#endif
// todo handle several ndb objects
// todo free allocated data when closing NdbEventBuffer
......@@ -343,14 +356,14 @@ NdbEventOperationImpl::getLatestGCI()
int
NdbEventOperationImpl::receive_event()
{
DBUG_ENTER("NdbEventOperationImpl::receive_event");
DBUG_ENTER_EVENT("NdbEventOperationImpl::receive_event");
Uint32 operation= (Uint32)m_data_item->sdata->operation;
DBUG_PRINT("info",("sdata->operation %u",operation));
DBUG_PRINT_EVENT("info",("sdata->operation %u",operation));
if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
{
DBUG_RETURN(1);
DBUG_RETURN_EVENT(1);
}
// now move the data into the RecAttrs
......@@ -361,8 +374,8 @@ NdbEventOperationImpl::receive_event()
Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
Uint32 *aDataPtr = m_data_item->ptr[1].p;
DBUG_DUMP("after",(char*)m_data_item->ptr[1].p, m_data_item->ptr[1].sz*4);
DBUG_DUMP("before",(char*)m_data_item->ptr[2].p, m_data_item->ptr[2].sz*4);
DBUG_DUMP_EVENT("after",(char*)m_data_item->ptr[1].p, m_data_item->ptr[1].sz*4);
DBUG_DUMP_EVENT("before",(char*)m_data_item->ptr[2].p, m_data_item->ptr[2].sz*4);
// copy data into the RecAttr's
// we assume that the respective attribute lists are sorted
......@@ -402,7 +415,7 @@ NdbEventOperationImpl::receive_event()
tDataSz = AttributeHeader(*aAttrPtr).getByteSize();
while (tAttrId > tRecAttrId) {
DBUG_PRINT("info",("undef [%u] %u 0x%x [%u] 0x%x",
DBUG_PRINT_EVENT("info",("undef [%u] %u 0x%x [%u] 0x%x",
tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
tWorkingRecAttr->setUNDEFINED();
tWorkingRecAttr = tWorkingRecAttr->next();
......@@ -416,7 +429,7 @@ NdbEventOperationImpl::receive_event()
if (tAttrId == tRecAttrId) {
hasSomeData++;
DBUG_PRINT("info",("set [%u] %u 0x%x [%u] 0x%x",
DBUG_PRINT_EVENT("info",("set [%u] %u 0x%x [%u] 0x%x",
tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
......@@ -467,10 +480,10 @@ NdbEventOperationImpl::receive_event()
if (hasSomeData || !is_update)
{
DBUG_RETURN(1);
DBUG_RETURN_EVENT(1);
}
DBUG_RETURN(0);
DBUG_RETURN_EVENT(0);
}
NdbDictionary::Event::TableEvent
......@@ -714,7 +727,7 @@ print_std(const char* tag, const SubTableData * sdata, LinearSectionPtr ptr[3])
NdbEventOperation *
NdbEventBuffer::nextEvent()
{
DBUG_ENTER("NdbEventBuffer::nextEvent");
DBUG_ENTER_EVENT("NdbEventBuffer::nextEvent");
#ifdef VM_TRACE
const char *m_latest_command_save= m_latest_command;
#endif
......@@ -764,7 +777,7 @@ NdbEventBuffer::nextEvent()
#ifdef VM_TRACE
m_latest_command= m_latest_command_save;
#endif
DBUG_RETURN(op->m_facade);
DBUG_RETURN_EVENT(op->m_facade);
}
// the next event belonged to an event op that is no
// longer valid, skip to next
......@@ -778,7 +791,7 @@ NdbEventBuffer::nextEvent()
#ifdef VM_TRACE
m_latest_command= m_latest_command_save;
#endif
DBUG_RETURN(0);
DBUG_RETURN_EVENT(0);
}
void
......@@ -882,7 +895,7 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep)
return;
}
DBUG_ENTER("NdbEventBuffer::execSUB_GCP_COMPLETE_REP");
DBUG_ENTER_EVENT("NdbEventBuffer::execSUB_GCP_COMPLETE_REP");
const Uint64 gci= rep->gci;
const Uint32 cnt= rep->gcp_complete_rep_count;
......@@ -901,7 +914,7 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep)
{
ndbout << i << " - " << m_active_gci[i] << endl;
}
DBUG_VOID_RETURN;
DBUG_VOID_RETURN_EVENT;
}
Uint32 old_cnt = bucket->m_gcp_complete_rep_count;
......@@ -949,7 +962,7 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep)
}
}
DBUG_VOID_RETURN;
DBUG_VOID_RETURN_EVENT;
}
void
......@@ -1145,16 +1158,15 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
const SubTableData * const sdata,
LinearSectionPtr ptr[3])
{
DBUG_ENTER("NdbEventBuffer::insertDataL");
DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL");
Uint64 gci= sdata->gci;
if ( likely((Uint32)op->mi_type & 1 << (Uint32)sdata->operation) )
{
Gci_container* bucket= find_bucket(&m_active_gci, gci);
DBUG_PRINT("info", ("data insertion in eventId %d", op->m_eventId));
DBUG_PRINT("info", ("gci=%d tab=%d op=%d node=%d",
DBUG_PRINT_EVENT("info", ("data insertion in eventId %d", op->m_eventId));
DBUG_PRINT_EVENT("info", ("gci=%d tab=%d op=%d node=%d",
sdata->gci, sdata->tableId, sdata->operation,
sdata->req_nodeid));
......@@ -1164,7 +1176,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
* Already completed GCI...
* Possible in case of resend during NF handling
*/
DBUG_RETURN(0);
DBUG_RETURN_EVENT(0);
}
bool use_hash =
......@@ -1187,13 +1199,13 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
if (unlikely(data == 0))
{
op->m_has_error = 2;
DBUG_RETURN(-1);
DBUG_RETURN_EVENT(-1);
}
if (unlikely(copy_data(sdata, ptr, data)))
{
op->m_has_error = 3;
DBUG_RETURN(-1);
DBUG_RETURN_EVENT(-1);
}
// add it to list and hash table
bucket->m_data.append(data);
......@@ -1211,7 +1223,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
if (unlikely(merge_data(sdata, ptr, data)))
{
op->m_has_error = 3;
DBUG_RETURN(-1);
DBUG_RETURN_EVENT(-1);
}
}
data->m_event_op = op;
......@@ -1219,22 +1231,22 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
{
data->m_pkhash = hpos.pkhash;
}
DBUG_RETURN(0);
DBUG_RETURN_EVENT(0);
}
#ifdef VM_TRACE
if ((Uint32)op->m_eventImpl->mi_type & 1 << (Uint32)sdata->operation)
{
// XXX never reached
DBUG_PRINT("info",("Data arrived before ready eventId", op->m_eventId));
DBUG_RETURN(0);
DBUG_PRINT_EVENT("info",("Data arrived before ready eventId", op->m_eventId));
DBUG_RETURN_EVENT(0);
}
else {
DBUG_PRINT("info",("skipped"));
DBUG_RETURN(0);
DBUG_PRINT_EVENT("info",("skipped"));
DBUG_RETURN_EVENT(0);
}
#else
return 0;
DBUG_RETURN_EVENT(0);
#endif
}
......@@ -1242,7 +1254,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
EventBufData*
NdbEventBuffer::alloc_data()
{
DBUG_ENTER("alloc_data");
DBUG_ENTER_EVENT("alloc_data");
EventBufData* data = m_free_data;
if (unlikely(data == 0))
......@@ -1268,7 +1280,7 @@ NdbEventBuffer::alloc_data()
m_available_data.m_tail ? m_available_data.m_tail->sdata->gci : 0);
printf("m_used_data_count %d\n", m_used_data.m_count);
#endif
DBUG_RETURN(0); // TODO handle this, overrun, or, skip?
DBUG_RETURN_EVENT(0); // TODO handle this, overrun, or, skip?
}
}
......@@ -1280,7 +1292,7 @@ NdbEventBuffer::alloc_data()
assert(m_free_data_sz >= data->sz);
#endif
m_free_data_sz -= data->sz;
DBUG_RETURN(data);
DBUG_RETURN_EVENT(data);
}
// allocate initial or bigger memory area in EventBufData
......@@ -1328,15 +1340,15 @@ NdbEventBuffer::copy_data(const SubTableData * const sdata,
LinearSectionPtr ptr[3],
EventBufData* data)
{
DBUG_ENTER("NdbEventBuffer::copy_data");
DBUG_ENTER_EVENT("NdbEventBuffer::copy_data");
if (alloc_mem(data, ptr) != 0)
DBUG_RETURN(-1);
DBUG_RETURN_EVENT(-1);
memcpy(data->sdata, sdata, sizeof(SubTableData));
int i;
for (i = 0; i <= 2; i++)
memcpy(data->ptr[i].p, ptr[i].p, ptr[i].sz << 2);
DBUG_RETURN(0);
DBUG_RETURN_EVENT(0);
}
static struct Ev_t {
......@@ -1406,14 +1418,14 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
LinearSectionPtr ptr2[3],
EventBufData* data)
{
DBUG_ENTER("NdbEventBuffer::merge_data");
DBUG_ENTER_EVENT("NdbEventBuffer::merge_data");
Uint32 nkey = data->m_event_op->m_eventImpl->m_tableImpl->m_noOfKeys;
int t1 = data->sdata->operation;
int t2 = sdata->operation;
if (t1 == Ev_t::NUL)
DBUG_RETURN(copy_data(sdata, ptr2, data));
DBUG_RETURN_EVENT(copy_data(sdata, ptr2, data));
Ev_t* tp = 0;
int i;
......@@ -1441,7 +1453,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
if (loop == 1)
{
if (alloc_mem(data, ptr) != 0)
DBUG_RETURN(-1);
DBUG_RETURN_EVENT(-1);
*data->sdata = *sdata;
data->sdata->operation = tp->t3;
}
......@@ -1558,7 +1570,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
// free old data
NdbMem_Free((char*)olddata.memory);
DBUG_RETURN(0);
DBUG_RETURN_EVENT(0);
}
NdbEventOperationImpl *
......@@ -1581,11 +1593,11 @@ NdbEventBuffer::move_data()
}
if (!m_available_data.is_empty())
{
DBUG_ENTER("NdbEventBuffer::move_data");
DBUG_ENTER_EVENT("NdbEventBuffer::move_data");
#ifdef VM_TRACE
DBUG_PRINT("exit",("m_available_data_count %u", m_available_data.m_count));
DBUG_PRINT_EVENT("exit",("m_available_data_count %u", m_available_data.m_count));
#endif
DBUG_RETURN(m_available_data.m_head->m_event_op);
DBUG_RETURN_EVENT(m_available_data.m_head->m_event_op);
}
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment