Commit 5bef4550 authored by unknown's avatar unknown

Implemented fragmented signal from API

primarily to enable creation of tables with more attributes than 91 (up to 128)

Intermediate push for testing
CHUNK_SZ will be set higher in real implementation and API_TRACE added

parent 55833fb4
...@@ -475,7 +475,8 @@ TransporterFacade::TransporterFacade() : ...@@ -475,7 +475,8 @@ TransporterFacade::TransporterFacade() :
theTransporterRegistry(0), theTransporterRegistry(0),
theStopReceive(0), theStopReceive(0),
theSendThread(NULL), theSendThread(NULL),
theReceiveThread(NULL) theReceiveThread(NULL),
m_fragmented_signal_id(0)
{ {
theOwnId = 0; theOwnId = 0;
...@@ -833,6 +834,105 @@ TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){ ...@@ -833,6 +834,105 @@ TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){
return (ss == SEND_OK ? 0 : -1); return (ss == SEND_OK ? 0 : -1);
} }
#define CHUNK_SZ 100u
int
TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode,
LinearSectionPtr ptr[3], Uint32 secs){
NdbApiSignal tmp_signal(*(SignalHeader*)aSignal);
LinearSectionPtr tmp_ptr[3];
Uint32 unique_id= m_fragmented_signal_id++; // next unique id
unsigned i;
for (i= 0; i < secs; i++)
tmp_ptr[i]= ptr[i];
unsigned start_i= 0;
unsigned chunk_sz= 0;
unsigned fragment_info= 0;
Uint32 *tmp_data= tmp_signal.getDataPtrSend();
for (i= 0; i < secs;) {
unsigned save_sz= tmp_ptr[i].sz;
tmp_data[i-start_i]= i;
if (chunk_sz + save_sz > CHUNK_SZ) {
// truncate
unsigned send_sz= CHUNK_SZ - chunk_sz;
tmp_ptr[i].sz= send_sz;
if (fragment_info < 2)
fragment_info++;
// send tmp_signal
tmp_data[i-start_i+1]= unique_id;
tmp_signal.setLength(i-start_i+2);
tmp_signal.m_fragmentInfo= fragment_info;
// do prepare send
{
int ret;
if(getIsNodeSendable(aNode) == true){
SendStatus ss = theTransporterRegistry->prepareSend
(&tmp_signal,
1, // JBB
tmp_signal.getDataPtrSend(),
aNode,
&ptr[start_i]);
assert(ss != SEND_MESSAGE_TOO_BIG);
ret = (ss == SEND_OK ? 0 : -1);
} else
ret = -1;
if (ret != SEND_OK)
return ret;
}
// setup variables for next signal
start_i= i;
chunk_sz= 0;
tmp_ptr[i].sz= save_sz-send_sz;
tmp_ptr[i].p+= send_sz;
}
else
{
chunk_sz+= save_sz;
i++;
}
}
unsigned a_sz= aSignal->getLength();
if (fragment_info > 0) {
// update the original signal to include section info
Uint32 *a_data= aSignal->getDataPtrSend();
unsigned tmp_sz= i-start_i;
memcpy(a_data+a_sz,
tmp_data,
tmp_sz*sizeof(Uint32));
a_data[a_sz+tmp_sz]= unique_id;
aSignal->setLength(a_sz+tmp_sz+1);
// send last fragment
aSignal->m_fragmentInfo= 3;
aSignal->m_noOfSections= i-start_i;
} else {
aSignal->m_noOfSections= secs;
}
// send aSignal
int ret;
if(getIsNodeSendable(aNode) == true){
SendStatus ss = theTransporterRegistry->prepareSend
(aSignal,
1, // JBB
aSignal->getDataPtrSend(),
aNode,
&ptr[start_i]);
assert(ss != SEND_MESSAGE_TOO_BIG);
ret = (ss == SEND_OK ? 0 : -1);
} else
ret = -1;
aSignal->m_noOfSections = 0;
aSignal->m_fragmentInfo = 0;
aSignal->setLength(a_sz);
return ret;
}
#if 0
int int
TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode,
LinearSectionPtr ptr[3], Uint32 secs){ LinearSectionPtr ptr[3], Uint32 secs){
...@@ -864,7 +964,7 @@ TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, ...@@ -864,7 +964,7 @@ TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode,
aSignal->m_noOfSections = 0; aSignal->m_noOfSections = 0;
return -1; return -1;
} }
#endif
int int
...@@ -886,11 +986,10 @@ TransporterFacade::sendFragmentedSignalUnCond(NdbApiSignal* aSignal, ...@@ -886,11 +986,10 @@ TransporterFacade::sendFragmentedSignalUnCond(NdbApiSignal* aSignal,
aSignal->theSendersBlockRef = tmp; aSignal->theSendersBlockRef = tmp;
} }
#endif #endif
SendStatus ss = theTransporterRegistry->prepareSend(aSignal, SendStatus ss =
1, // JBB theTransporterRegistry->prepareSend(aSignal, 1, // JBB
aSignal->getDataPtrSend(), aSignal->getDataPtrSend(),
aNode, aNode, ptr);
ptr);
assert(ss != SEND_MESSAGE_TOO_BIG); assert(ss != SEND_MESSAGE_TOO_BIG);
aSignal->m_noOfSections = 0; aSignal->m_noOfSections = 0;
return (ss == SEND_OK ? 0 : -1); return (ss == SEND_OK ? 0 : -1);
......
...@@ -224,7 +224,8 @@ private: ...@@ -224,7 +224,8 @@ private:
} m_threads; } m_threads;
Uint32 m_max_trans_id; Uint32 m_max_trans_id;
Uint32 m_fragmented_signal_id;
/** /**
* execute function * execute function
*/ */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment