Bug#25387 - ndb: dbug assert in reference counting for event operations

- blob event operation not reference counted correctly, missing TE_ACTIVE
- add reference counting for blobs events
- make sure also blob event operations get TE_ACTIVE
- some minor cleanups + adjustment of dbug prints
parent c3b75121
...@@ -215,8 +215,6 @@ NdbEventOperationImpl::~NdbEventOperationImpl() ...@@ -215,8 +215,6 @@ NdbEventOperationImpl::~NdbEventOperationImpl()
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
stop(); stop();
// m_bufferHandle->dropSubscribeEvent(m_bufferId);
; // ToDo? We should send stop signal here
if (theMainOp == NULL) if (theMainOp == NULL)
{ {
...@@ -428,7 +426,7 @@ NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n) ...@@ -428,7 +426,7 @@ NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n)
// create blob event operation // create blob event operation
tBlobOp = tBlobOp =
m_ndb->theEventBuffer->createEventOperation(*blobEvnt, m_error); m_ndb->theEventBuffer->createEventOperationImpl(*blobEvnt, m_error);
if (tBlobOp == NULL) if (tBlobOp == NULL)
DBUG_RETURN(NULL); DBUG_RETURN(NULL);
...@@ -561,6 +559,8 @@ NdbEventOperationImpl::execute_nolock() ...@@ -561,6 +559,8 @@ NdbEventOperationImpl::execute_nolock()
m_state= EO_EXECUTING; m_state= EO_EXECUTING;
mi_type= m_eventImpl->mi_type; mi_type= m_eventImpl->mi_type;
m_ndb->theEventBuffer->add_op(); m_ndb->theEventBuffer->add_op();
m_ref_count++;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*this); int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*this);
if (r == 0) { if (r == 0) {
if (theMainOp == NULL) { if (theMainOp == NULL) {
...@@ -568,19 +568,24 @@ NdbEventOperationImpl::execute_nolock() ...@@ -568,19 +568,24 @@ NdbEventOperationImpl::execute_nolock()
NdbEventOperationImpl* blob_op = theBlobOpList; NdbEventOperationImpl* blob_op = theBlobOpList;
while (blob_op != NULL) { while (blob_op != NULL) {
r = blob_op->execute_nolock(); r = blob_op->execute_nolock();
if (r != 0) if (r != 0) {
break; break;
}
// blob event op now holds reference
// cleared by TE_STOP or TE_CLUSTER_FAILURE
m_ref_count++;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
blob_op = blob_op->m_next; blob_op = blob_op->m_next;
} }
} }
if (r == 0) if (r == 0)
{ {
m_ref_count++;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
DBUG_RETURN(0); DBUG_RETURN(0);
} }
} }
//Error //Error
m_ref_count--;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
m_state= EO_ERROR; m_state= EO_ERROR;
mi_type= 0; mi_type= 0;
m_magic_number= 0; m_magic_number= 0;
...@@ -1582,6 +1587,33 @@ NdbEventBuffer::complete_outof_order_gcis() ...@@ -1582,6 +1587,33 @@ NdbEventBuffer::complete_outof_order_gcis()
ndbout_c("complete_outof_order_gcis: m_latestGCI: %lld", m_latestGCI); ndbout_c("complete_outof_order_gcis: m_latestGCI: %lld", m_latestGCI);
} }
void
NdbEventBuffer::insert_event(NdbEventOperationImpl* impl,
SubTableData &data,
LinearSectionPtr *ptr,
Uint32 &oid_ref)
{
NdbEventOperationImpl *dropped_ev_op = m_dropped_ev_op;
do
{
do
{
oid_ref = impl->m_oid;
insertDataL(impl, &data, ptr);
NdbEventOperationImpl* blob_op = impl->theBlobOpList;
while (blob_op != NULL)
{
oid_ref = blob_op->m_oid;
insertDataL(blob_op, &data, ptr);
blob_op = blob_op->m_next;
}
} while((impl = impl->m_next));
impl = dropped_ev_op;
dropped_ev_op = NULL;
} while (impl);
}
void void
NdbEventBuffer::report_node_connected(Uint32 node_id) NdbEventBuffer::report_node_connected(Uint32 node_id)
{ {
...@@ -1606,21 +1638,8 @@ NdbEventBuffer::report_node_connected(Uint32 node_id) ...@@ -1606,21 +1638,8 @@ NdbEventBuffer::report_node_connected(Uint32 node_id)
/** /**
* Insert this event for each operation * Insert this event for each operation
*/ */
{ // no need to lock()/unlock(), receive thread calls this
// no need to lock()/unlock(), receive thread calls this insert_event(&op->m_impl, data, ptr, data.senderData);
NdbEventOperationImpl* impl = &op->m_impl;
do if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
} while((impl = impl->m_next));
for (impl = m_dropped_ev_op; impl; impl = impl->m_next)
if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
}
}
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -1648,21 +1667,8 @@ NdbEventBuffer::report_node_failure(Uint32 node_id) ...@@ -1648,21 +1667,8 @@ NdbEventBuffer::report_node_failure(Uint32 node_id)
/** /**
* Insert this event for each operation * Insert this event for each operation
*/ */
{ // no need to lock()/unlock(), receive thread calls this
// no need to lock()/unlock(), receive thread calls this insert_event(&op->m_impl, data, ptr, data.senderData);
NdbEventOperationImpl* impl = &op->m_impl;
do if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
} while((impl = impl->m_next));
for (impl = m_dropped_ev_op; impl; impl = impl->m_next)
if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
}
}
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -1693,21 +1699,8 @@ NdbEventBuffer::completeClusterFailed() ...@@ -1693,21 +1699,8 @@ NdbEventBuffer::completeClusterFailed()
/** /**
* Insert this event for each operation * Insert this event for each operation
*/ */
{ // no need to lock()/unlock(), receive thread calls this
// no need to lock()/unlock(), receive thread calls this insert_event(&op->m_impl, data, ptr, data.senderData);
NdbEventOperationImpl* impl = &op->m_impl;
do if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
} while((impl = impl->m_next));
for (impl = m_dropped_ev_op; impl; impl = impl->m_next)
if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
}
}
/** /**
* Release all GCI's with m_gci > gci * Release all GCI's with m_gci > gci
...@@ -1797,17 +1790,36 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, ...@@ -1797,17 +1790,36 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
{ {
case NdbDictionary::Event::_TE_NODE_FAILURE: case NdbDictionary::Event::_TE_NODE_FAILURE:
op->m_node_bit_mask.clear(SubTableData::getNdbdNodeId(ri)); op->m_node_bit_mask.clear(SubTableData::getNdbdNodeId(ri));
DBUG_PRINT("info",
("_TE_NODE_FAILURE: m_ref_count: %u for op: %p id: %u",
op->m_ref_count, op, SubTableData::getNdbdNodeId(ri)));
break; break;
case NdbDictionary::Event::_TE_ACTIVE: case NdbDictionary::Event::_TE_ACTIVE:
op->m_node_bit_mask.set(SubTableData::getNdbdNodeId(ri)); op->m_node_bit_mask.set(SubTableData::getNdbdNodeId(ri));
// internal event, do not relay to user // internal event, do not relay to user
DBUG_PRINT("info",
("_TE_ACTIVE: m_ref_count: %u for op: %p id: %u",
op->m_ref_count, op, SubTableData::getNdbdNodeId(ri)));
DBUG_RETURN_EVENT(0); DBUG_RETURN_EVENT(0);
break; break;
case NdbDictionary::Event::_TE_CLUSTER_FAILURE: case NdbDictionary::Event::_TE_CLUSTER_FAILURE:
op->m_node_bit_mask.clear(); if (!op->m_node_bit_mask.isclear())
DBUG_ASSERT(op->m_ref_count > 0); {
op->m_ref_count--; op->m_node_bit_mask.clear();
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op)); DBUG_ASSERT(op->m_ref_count > 0);
op->m_ref_count--;
DBUG_PRINT("info", ("_TE_CLUSTER_FAILURE: m_ref_count: %u for op: %p",
op->m_ref_count, op));
if (op->theMainOp)
{
// blob event op, need to clear ref count in main op
DBUG_ASSERT(op->m_ref_count == 0);
DBUG_ASSERT(op->theMainOp->m_ref_count > 0);
op->theMainOp->m_ref_count--;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p",
op->theMainOp->m_ref_count, op->theMainOp));
}
}
break; break;
case NdbDictionary::Event::_TE_STOP: case NdbDictionary::Event::_TE_STOP:
op->m_node_bit_mask.clear(SubTableData::getNdbdNodeId(ri)); op->m_node_bit_mask.clear(SubTableData::getNdbdNodeId(ri));
...@@ -1815,7 +1827,17 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, ...@@ -1815,7 +1827,17 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
{ {
DBUG_ASSERT(op->m_ref_count > 0); DBUG_ASSERT(op->m_ref_count > 0);
op->m_ref_count--; op->m_ref_count--;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op)); DBUG_PRINT("info", ("_TE_STOP: m_ref_count: %u for op: %p",
op->m_ref_count, op));
if (op->theMainOp)
{
// blob event op, need to clear ref count in main op
DBUG_ASSERT(op->m_ref_count == 0);
DBUG_ASSERT(op->theMainOp->m_ref_count > 0);
op->theMainOp->m_ref_count--;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p",
op->theMainOp->m_ref_count, op->theMainOp));
}
} }
break; break;
default: default:
...@@ -2639,10 +2661,10 @@ NdbEventBuffer::createEventOperation(const char* eventName, ...@@ -2639,10 +2661,10 @@ NdbEventBuffer::createEventOperation(const char* eventName,
} }
NdbEventOperationImpl* NdbEventOperationImpl*
NdbEventBuffer::createEventOperation(NdbEventImpl& evnt, NdbEventBuffer::createEventOperationImpl(NdbEventImpl& evnt,
NdbError &theError) NdbError &theError)
{ {
DBUG_ENTER("NdbEventBuffer::createEventOperation [evnt]"); DBUG_ENTER("NdbEventBuffer::createEventOperationImpl");
NdbEventOperationImpl* tOp= new NdbEventOperationImpl(m_ndb, evnt); NdbEventOperationImpl* tOp= new NdbEventOperationImpl(m_ndb, evnt);
if (tOp == 0) if (tOp == 0)
{ {
......
...@@ -436,8 +436,8 @@ public: ...@@ -436,8 +436,8 @@ public:
Vector<Gci_container_pod> m_active_gci; Vector<Gci_container_pod> m_active_gci;
NdbEventOperation *createEventOperation(const char* eventName, NdbEventOperation *createEventOperation(const char* eventName,
NdbError &); NdbError &);
NdbEventOperationImpl *createEventOperation(NdbEventImpl& evnt, NdbEventOperationImpl *createEventOperationImpl(NdbEventImpl& evnt,
NdbError &); NdbError &);
void dropEventOperation(NdbEventOperation *); void dropEventOperation(NdbEventOperation *);
static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp); static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp);
...@@ -541,6 +541,11 @@ public: ...@@ -541,6 +541,11 @@ public:
#endif #endif
private: private:
void insert_event(NdbEventOperationImpl* impl,
SubTableData &data,
LinearSectionPtr *ptr,
Uint32 &oid_ref);
int expand(unsigned sz); int expand(unsigned sz);
// all allocated data // all allocated data
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment