/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "Trix.hpp"

#include <string.h>
#include <kernel_types.h>
#include <NdbOut.hpp>

#include <signaldata/ReadNodesConf.hpp>
#include <signaldata/NodeFailRep.hpp>
#include <signaldata/DumpStateOrd.hpp>
#include <signaldata/GetTabInfo.hpp>
#include <signaldata/DictTabInfo.hpp>
#include <signaldata/BuildIndx.hpp>
#include <signaldata/SumaImpl.hpp>
#include <signaldata/UtilPrepare.hpp>
#include <signaldata/UtilExecute.hpp>
#include <signaldata/UtilRelease.hpp>
#include <SectionReader.hpp>
#include <AttributeHeader.hpp>

#define CONSTRAINT_VIOLATION 893

#define DEBUG(x) { ndbout << "TRIX::" << x << endl; }

/**
 *
 */
Trix::Trix(const Configuration & conf) :
  SimulatedBlock(TRIX, conf),
  c_theNodes(c_theNodeRecPool),
  c_masterNodeId(0),
  c_masterTrixRef(0),
  c_noNodesFailed(0),
  c_noActiveNodes(0),
  c_theSubscriptions(c_theSubscriptionRecPool)
{
  BLOCK_CONSTRUCTOR(Trix);

  // Add received signals
  addRecSignal(GSN_STTOR,  &Trix::execSTTOR);
  addRecSignal(GSN_NDB_STTOR,  &Trix::execNDB_STTOR); // Forwarded from DICT
  addRecSignal(GSN_READ_NODESCONF, &Trix::execREAD_NODESCONF);
  addRecSignal(GSN_READ_NODESREF, &Trix::execREAD_NODESREF);
  addRecSignal(GSN_NODE_FAILREP, &Trix::execNODE_FAILREP);
  addRecSignal(GSN_INCL_NODEREQ, &Trix::execINCL_NODEREQ);
  addRecSignal(GSN_DUMP_STATE_ORD, &Trix::execDUMP_STATE_ORD);

  // Index build
  addRecSignal(GSN_BUILDINDXREQ, &Trix::execBUILDINDXREQ);
  // Dump testing
  addRecSignal(GSN_BUILDINDXCONF, &Trix::execBUILDINDXCONF);
  addRecSignal(GSN_BUILDINDXREF, &Trix::execBUILDINDXREF);


  addRecSignal(GSN_UTIL_PREPARE_CONF, &Trix::execUTIL_PREPARE_CONF);
  addRecSignal(GSN_UTIL_PREPARE_REF, &Trix::execUTIL_PREPARE_REF);
  addRecSignal(GSN_UTIL_EXECUTE_CONF, &Trix::execUTIL_EXECUTE_CONF);
  addRecSignal(GSN_UTIL_EXECUTE_REF, &Trix::execUTIL_EXECUTE_REF);
  addRecSignal(GSN_UTIL_RELEASE_CONF, &Trix::execUTIL_RELEASE_CONF);
  addRecSignal(GSN_UTIL_RELEASE_REF, &Trix::execUTIL_RELEASE_REF);


  // Suma signals
  addRecSignal(GSN_SUB_CREATE_CONF, &Trix::execSUB_CREATE_CONF);
  addRecSignal(GSN_SUB_CREATE_REF, &Trix::execSUB_CREATE_REF);
  addRecSignal(GSN_SUB_REMOVE_CONF, &Trix::execSUB_REMOVE_CONF);
  addRecSignal(GSN_SUB_REMOVE_REF, &Trix::execSUB_REMOVE_REF);
  addRecSignal(GSN_SUB_SYNC_CONF, &Trix::execSUB_SYNC_CONF);
  addRecSignal(GSN_SUB_SYNC_REF, &Trix::execSUB_SYNC_REF);
  addRecSignal(GSN_SUB_SYNC_CONTINUE_REQ, &Trix::execSUB_SYNC_CONTINUE_REQ);
  addRecSignal(GSN_SUB_META_DATA, &Trix::execSUB_META_DATA);
  addRecSignal(GSN_SUB_TABLE_DATA, &Trix::execSUB_TABLE_DATA);

  // Allocate pool sizes
  c_theAttrOrderBufferPool.setSize(100);
  c_theSubscriptionRecPool.setSize(100);

  ArrayList<SubscriptionRecord> subscriptions(c_theSubscriptionRecPool);
  SubscriptionRecPtr subptr;
  while(subscriptions.seize(subptr) == true) {
    new (subptr.p) SubscriptionRecord(c_theAttrOrderBufferPool);
  }
  subscriptions.release();
}

/**
 *
 */
Trix::~Trix()
{
}

/**
 *
 */
void Trix::execSTTOR(Signal* signal) 
{
  jamEntry();                            

  //const Uint32 startphase   = signal->theData[1];
  const Uint32 theSignalKey = signal->theData[6];
  
  signal->theData[0] = theSignalKey;
  signal->theData[3] = 1;
  signal->theData[4] = 255; // No more start phases from missra
  sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 5, JBB);
  return;
}//Trix::execSTTOR()

/**
 *
 */
void Trix::execNDB_STTOR(Signal* signal) 
{
  jamEntry();                            
  BlockReference ndbcntrRef = signal->theData[0];	 
  Uint16 startphase = signal->theData[2];      /* RESTART PHASE           */
  Uint16 mynode = signal->theData[1];		 
  //Uint16 restarttype = signal->theData[3];	 
  //UintR configInfo1 = signal->theData[6];     /* CONFIGRATION INFO PART 1 */
  //UintR configInfo2 = signal->theData[7];     /* CONFIGRATION INFO PART 2 */
  switch (startphase) {
  case 3:
    jam();
    /* SYMBOLIC START PHASE 4             */
    /* ABSOLUTE PHASE 5                   */
    /* REQUEST NODE IDENTITIES FROM DBDIH */
    signal->theData[0] = calcTrixBlockRef(mynode);
    sendSignal(ndbcntrRef, GSN_READ_NODESREQ, signal, 1, JBB);
    return;
    break;
  case 6:
    break;
  default:
    break;
  }
}

/**
 *
 */
void Trix::execREAD_NODESCONF(Signal* signal)
{
  jamEntry();

  ReadNodesConf * const  readNodes = (ReadNodesConf *)signal->getDataPtr();
  //Uint32 noOfNodes   = readNodes->noOfNodes;
  NodeRecPtr nodeRecPtr;

  c_masterNodeId = readNodes->masterNodeId;
  c_masterTrixRef = RNIL;
  c_noNodesFailed = 0;

  for(unsigned i = 0; i < MAX_NDB_NODES; i++) {
    jam();
    if(NodeBitmask::get(readNodes->allNodes, i)) {
      // Node is defined
      jam();
      ndbrequire(c_theNodes.seizeId(nodeRecPtr, i));
      nodeRecPtr.p->trixRef = calcTrixBlockRef(i);
      if (i == c_masterNodeId) {
        c_masterTrixRef = nodeRecPtr.p->trixRef;
      }
      if(NodeBitmask::get(readNodes->inactiveNodes, i)){
        // Node is not active
	jam();
	/**-----------------------------------------------------------------
	 * THIS NODE IS DEFINED IN THE CLUSTER BUT IS NOT ALIVE CURRENTLY.
	 * WE ADD THE NODE TO THE SET OF FAILED NODES AND ALSO SET THE
	 * BLOCKSTATE TO BUSY TO AVOID ADDING TRIGGERS OR INDEXES WHILE 
	 * NOT ALL NODES ARE ALIVE.
	 *------------------------------------------------------------------*/
	arrGuard(c_noNodesFailed, MAX_NDB_NODES);
	nodeRecPtr.p->alive = false;
	c_noNodesFailed++;
	c_blockState = Trix::NODE_FAILURE;
      }
      else {
        // Node is active
        jam();
        c_noActiveNodes++;
        nodeRecPtr.p->alive = true;
      }
    }
  }
  if (c_noNodesFailed == 0) {
    c_blockState = Trix::STARTED;
  }
}

/**
 *
 */
void Trix::execREAD_NODESREF(Signal* signal)
{
  // NYI
}

/**
 *
 */
void Trix::execNODE_FAILREP(Signal* signal)
{
  jamEntry();
  NodeFailRep * const  nodeFail = (NodeFailRep *) signal->getDataPtr();

  //Uint32 failureNr    = nodeFail->failNo;
  //Uint32 numberNodes  = nodeFail->noOfNodes;
  Uint32 masterNodeId = nodeFail->masterNodeId;

  NodeRecPtr nodeRecPtr;

  for(c_theNodes.first(nodeRecPtr); 
      nodeRecPtr.i != RNIL; 
      c_theNodes.next(nodeRecPtr)) {
    if(NodeBitmask::get(nodeFail->theNodes, nodeRecPtr.i)) {
      nodeRecPtr.p->alive = false;
      c_noNodesFailed++;
      c_noActiveNodes--;      
    }
  }
  if (c_masterNodeId != masterNodeId) {
    c_masterNodeId = masterNodeId;
    NodeRecord* nodeRec = c_theNodes.getPtr(masterNodeId);
    c_masterTrixRef = nodeRec->trixRef;
  }
}

/**
 *
 */
void Trix::execINCL_NODEREQ(Signal* signal)
{
  jamEntry();
  UintR node_id = signal->theData[1];
  NodeRecord* nodeRec = c_theNodes.getPtr(node_id);
  nodeRec->alive = true;
  c_noNodesFailed--;
  c_noActiveNodes++;      
  nodeRec->trixRef = calcTrixBlockRef(node_id);
  if (c_noNodesFailed == 0) {
    c_blockState = Trix::STARTED;
  }  
}

// Debugging
void
Trix::execDUMP_STATE_ORD(Signal* signal)
{
  jamEntry();

  DumpStateOrd * dumpStateOrd = (DumpStateOrd *)signal->getDataPtr();

  switch(dumpStateOrd->args[0]) {
  case(300): {// ok
    // index2 -T; index2 -I -n10000; index2 -c
    // all dump 300 0 0 0 0 0 4 2
    // select_count INDEX0000
    BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
    
    MEMCOPY_NO_WORDS(buildIndxReq, 
		     signal->theData + 1, 
		     BuildIndxReq::SignalLength);
    buildIndxReq->setUserRef(reference()); // return to me
    buildIndxReq->setParallelism(10);
    Uint32 indexColumns[1] = {1};
    Uint32 keyColumns[1] = {0};
    struct LinearSectionPtr orderPtr[2];
    buildIndxReq->setColumnOrder(indexColumns, 1, keyColumns, 1, orderPtr);
    sendSignal(reference(),
	       GSN_BUILDINDXREQ,
	       signal,
	       BuildIndxReq::SignalLength,
	       JBB,
	       orderPtr,
	       BuildIndxReq::NoOfSections);
    break;
  }
  case(301): { // ok
    // index2 -T; index2 -I -n10000; index2 -c -p
    // all dump 301 0 0 0 0 0 4 2
    // select_count INDEX0000
    BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
    
    MEMCOPY_NO_WORDS(buildIndxReq, 
		     signal->theData + 1, 
		     BuildIndxReq::SignalLength);
    buildIndxReq->setUserRef(reference()); // return to me
    buildIndxReq->setParallelism(10);
    Uint32 indexColumns[2] = {0, 1};
    Uint32 keyColumns[1] = {0};
    struct LinearSectionPtr orderPtr[2];
    buildIndxReq->setColumnOrder(indexColumns, 2, keyColumns, 1, orderPtr);
    sendSignal(reference(),
	       GSN_BUILDINDXREQ,
	       signal,
	       BuildIndxReq::SignalLength,
	       JBB,
	       orderPtr,
	       BuildIndxReq::NoOfSections);
    break;
  }
  case(302): { // ok
    // index -T; index -I -n1000; index -c -p
    // all dump 302 0 0 0 0 0 4 2
    // select_count PNUMINDEX0000
    BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
    
    MEMCOPY_NO_WORDS(buildIndxReq, 
		     signal->theData + 1, 
		     BuildIndxReq::SignalLength);
    buildIndxReq->setUserRef(reference()); // return to me
    buildIndxReq->setParallelism(10);
    Uint32 indexColumns[3] = {0, 3, 5};
    Uint32 keyColumns[1] = {0};
    struct LinearSectionPtr orderPtr[2];
    buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 1, orderPtr);
    sendSignal(reference(),
	       GSN_BUILDINDXREQ,
	       signal,
	       BuildIndxReq::SignalLength,
	       JBB,
	       orderPtr,
	       BuildIndxReq::NoOfSections);
    break;
  }
  case(303): { // ok
    // index -T -2; index -I -2 -n1000; index -c -p
    // all dump 303 0 0 0 0 0 4 2
    // select_count PNUMINDEX0000
    BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
    
    MEMCOPY_NO_WORDS(buildIndxReq, 
		     signal->theData + 1, 
		     BuildIndxReq::SignalLength);
    buildIndxReq->setUserRef(reference()); // return to me
    buildIndxReq->setParallelism(10);
    Uint32 indexColumns[3] = {0, 3, 5};
    Uint32 keyColumns[2] = {0, 1};
    struct LinearSectionPtr orderPtr[2];
    buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 2, orderPtr);
    sendSignal(reference(),
	       GSN_BUILDINDXREQ,
	       signal,
	       BuildIndxReq::SignalLength,
	       JBB,
	       orderPtr,
	       BuildIndxReq::NoOfSections);
    break;
  }
  case(304): { // ok
    // index -T -L; index -I -L -n1000; index -c -p
    // all dump 304 0 0 0 0 0 4 2
    // select_count PNUMINDEX0000
    BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
    
    MEMCOPY_NO_WORDS(buildIndxReq, 
		     signal->theData + 1, 
		     BuildIndxReq::SignalLength);
    buildIndxReq->setUserRef(reference()); // return to me
    buildIndxReq->setParallelism(10);
    Uint32 indexColumns[3] = {0, 3, 5};
    Uint32 keyColumns[1] = {0};
    struct LinearSectionPtr orderPtr[2];
    buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 1, orderPtr);
    sendSignal(reference(),
	       GSN_BUILDINDXREQ,
	       signal,
	       BuildIndxReq::SignalLength,
	       JBB,
	       orderPtr,
	       BuildIndxReq::NoOfSections);
    break;
  }
  case(305): { // ok
    // index -T -2 -L; index -I -2 -L -n1000; index -c -p
    // all dump 305 0 0 0 0 0 4 2
    // select_count PNUMINDEX0000
    BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
    
    MEMCOPY_NO_WORDS(buildIndxReq, 
		     signal->theData + 1, 
		     BuildIndxReq::SignalLength);
    buildIndxReq->setUserRef(reference()); // return to me
    buildIndxReq->setParallelism(10);
    Uint32 indexColumns[3] = {0, 3, 5};
    Uint32 keyColumns[2] = {0, 1};
    struct LinearSectionPtr orderPtr[2];
    buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 2, orderPtr);
    sendSignal(reference(),
	       GSN_BUILDINDXREQ,
	       signal,
	       BuildIndxReq::SignalLength,
	       JBB,
	       orderPtr,
	       BuildIndxReq::NoOfSections);
    break;
  }
  default: {
    // Ignore
  }
  }
}

// Build index
/**
 *
 */
void Trix:: execBUILDINDXREQ(Signal* signal)
{
  jamEntry();
  BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtr();

  // Seize a subscription record
  SubscriptionRecPtr subRecPtr;
  SubscriptionRecord* subRec;
  
  if (!c_theSubscriptions.seizeId(subRecPtr, buildIndxReq->getBuildId())) {
    // Failed to allocate subscription record
    BuildIndxRef * buildIndxRef = (BuildIndxRef *)signal->getDataPtrSend();

    buildIndxRef->setErrorCode(BuildIndxRef::AllocationFailure);
    releaseSections(signal);
    sendSignal(buildIndxReq->getUserRef(), 
	       GSN_BUILDINDXREF, signal, BuildIndxRef::SignalLength, JBB);
    return;
  }
  subRec = subRecPtr.p;
  subRec->errorCode = BuildIndxRef::NoError;
  subRec->userReference = buildIndxReq->getUserRef();
  subRec->connectionPtr = buildIndxReq->getConnectionPtr();
  subRec->subscriptionId = buildIndxReq->getBuildId();
  subRec->subscriptionKey = buildIndxReq->getBuildKey();
  subRec->indexType = buildIndxReq->getIndexType();
  subRec->sourceTableId = buildIndxReq->getTableId();
  subRec->targetTableId = buildIndxReq->getIndexId();
  subRec->parallelism = buildIndxReq->getParallelism();
  subRec->expectedConf = 0;
  subRec->subscriptionCreated = false;
  subRec->pendingSubSyncContinueConf = false;
  subRec->prepareId = RNIL;

  // Get column order segments
  Uint32 noOfSections = signal->getNoOfSections();
  if(noOfSections > 0) {
    SegmentedSectionPtr ptr;
    signal->getSection(ptr, BuildIndxReq::INDEX_COLUMNS);
    append(subRec->attributeOrder, ptr, getSectionSegmentPool());
    subRec->noOfIndexColumns = ptr.sz;
  }
  if(noOfSections > 1) {
    SegmentedSectionPtr ptr;
    signal->getSection(ptr, BuildIndxReq::KEY_COLUMNS);
    append(subRec->attributeOrder, ptr, getSectionSegmentPool());
    subRec->noOfKeyColumns = ptr.sz;
  }
#if 0
  // Debugging
  printf("Trix:: execBUILDINDXREQ: Attribute order:\n");
  subRec->attributeOrder.print(stdout);
#endif
  releaseSections(signal);
  prepareInsertTransactions(signal, subRecPtr);
}

void Trix:: execBUILDINDXCONF(Signal* signal)
{
  printf("Trix:: execBUILDINDXCONF\n");
}

void Trix:: execBUILDINDXREF(Signal* signal)
{
  printf("Trix:: execBUILDINDXREF\n");
}

void Trix::execUTIL_PREPARE_CONF(Signal* signal)
{
  jamEntry();
  UtilPrepareConf * utilPrepareConf = (UtilPrepareConf *)signal->getDataPtr();
  SubscriptionRecPtr subRecPtr;
  SubscriptionRecord* subRec;

  subRecPtr.i = utilPrepareConf->senderData;
  if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
    printf("Trix::execUTIL_PREPARE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
    return;
  }
  subRecPtr.p = subRec;
  subRec->prepareId = utilPrepareConf->prepareId;
  setupSubscription(signal, subRecPtr);
}

void Trix::execUTIL_PREPARE_REF(Signal* signal)
{
  jamEntry();
  UtilPrepareRef * utilPrepareRef = (UtilPrepareRef *)signal->getDataPtr();
  SubscriptionRecPtr subRecPtr;
  SubscriptionRecord* subRec;

  subRecPtr.i = utilPrepareRef->senderData;
  if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
    printf("Trix::execUTIL_PREPARE_REF: Failed to find subscription data %u\n", subRecPtr.i);
    return;
  }
  subRecPtr.p = subRec;
  subRec->errorCode = BuildIndxRef::InternalError;
}

void Trix::execUTIL_EXECUTE_CONF(Signal* signal)
{
  jamEntry();
  UtilExecuteConf * utilExecuteConf = (UtilExecuteConf *)signal->getDataPtr();
  SubscriptionRecPtr subRecPtr;
  SubscriptionRecord* subRec;

  subRecPtr.i = utilExecuteConf->senderData;
  if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
    printf("rix::execUTIL_EXECUTE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
    return;
  }
  subRecPtr.p = subRec;
  subRec->expectedConf--;
  checkParallelism(signal, subRec);
  if (subRec->expectedConf == 0)
    buildComplete(signal, subRecPtr);
}

void Trix::execUTIL_EXECUTE_REF(Signal* signal)
{
  jamEntry();
  UtilExecuteRef * utilExecuteRef = (UtilExecuteRef *)signal->getDataPtr();
  SubscriptionRecPtr subRecPtr;
  SubscriptionRecord* subRec;

  subRecPtr.i = utilExecuteRef->senderData;
  if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
    printf("Trix::execUTIL_EXECUTE_REF: Failed to find subscription data %u\n", subRecPtr.i);
    return;
  }
  subRecPtr.p = subRec;
  ndbrequire(utilExecuteRef->errorCode == UtilExecuteRef::TCError);
  if(utilExecuteRef->TCErrorCode == CONSTRAINT_VIOLATION)
    buildFailed(signal, subRecPtr, BuildIndxRef::IndexNotUnique);
  else
    buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
}

void Trix::execSUB_CREATE_CONF(Signal* signal)
{
  jamEntry();
  SubCreateConf * subCreateConf = (SubCreateConf *)signal->getDataPtr();
  SubscriptionRecPtr subRecPtr;
  SubscriptionRecord* subRec;

  subRecPtr.i = subCreateConf->subscriberData;
  if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
    printf("Trix::execSUB_CREATE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
    return;
  }
  ndbrequire(subRec->subscriptionId == subCreateConf->subscriptionId);
  ndbrequire(subRec->subscriptionKey == subCreateConf->subscriptionKey);
  subRec->subscriptionCreated = true;
  subRecPtr.p = subRec;
  setupTableScan(signal, subRecPtr);
}

void Trix::execSUB_CREATE_REF(Signal* signal)
{
  jamEntry();
  // THIS SIGNAL IS NEVER SENT FROM SUMA?
  /*
  SubCreateRef * subCreateRef = (SubCreateRef *)signal->getDataPtr();
  SubscriptionRecPtr subRecPtr;
  SubscriptionRecord* subRec;

  subRecPtr.i = subCreateRef->subscriberData;
  if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
    printf("Trix::execSUB_CREATE_REF: Failed to find subscription data %u\n", subRecPtr.i);
    return;
  }
  subRecPtr.p = subRec;
  buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
  */
}

void Trix::execSUB_SYNC_CONF(Signal* signal)
{
  jamEntry();
  SubSyncConf * subSyncConf = (SubSyncConf *)signal->getDataPtr();
  SubscriptionRecPtr subRecPtr;
  SubscriptionRecord* subRec;
  
  subRecPtr.i = subSyncConf->subscriberData;
  if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
    printf("Trix::execSUB_SYNC_CONF: Failed to find subscription data %u\n", subRecPtr.i);
    return;
  }
  ndbrequire(subRec->subscriptionId == subSyncConf->subscriptionId);
  ndbrequire(subRec->subscriptionKey == subSyncConf->subscriptionKey);
  subRecPtr.p = subRec;
  if(subSyncConf->part == SubscriptionData::MetaData)
    startTableScan(signal, subRecPtr);
  else {
    subRec->expectedConf--;
    checkParallelism(signal, subRec);
    if (subRec->expectedConf == 0)
      buildComplete(signal, subRecPtr);
  }
}

void Trix::execSUB_SYNC_REF(Signal* signal)
{
  jamEntry();
  SubSyncRef * subSyncRef = (SubSyncRef *)signal->getDataPtr();
  SubscriptionRecPtr subRecPtr;
  SubscriptionRecord* subRec;

  subRecPtr.i = subSyncRef->subscriberData;
  if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
    printf("Trix::execSUB_SYNC_REF: Failed to find subscription data %u\n", subRecPtr.i);
    return;
  }
  subRecPtr.p = subRec;
  buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
}

void Trix::execSUB_SYNC_CONTINUE_REQ(Signal* signal)
{
  SubSyncContinueReq  * subSyncContinueReq = 
    (SubSyncContinueReq *) signal->getDataPtr();
  
  SubscriptionRecPtr subRecPtr;
  SubscriptionRecord* subRec;
  subRecPtr.i = subSyncContinueReq->subscriberData;
  if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
    printf("Trix::execSUB_SYNC_CONTINUE_REQ: Failed to find subscription data %u\n", subRecPtr.i);
    return;
  }
  subRecPtr.p = subRec;
  subRec->pendingSubSyncContinueConf = true;
  checkParallelism(signal, subRec);
}

void Trix::execSUB_META_DATA(Signal* signal)
{
  jamEntry();
}

void Trix::execSUB_TABLE_DATA(Signal* signal)
{
  jamEntry();
  SubTableData * subTableData = (SubTableData *)signal->getDataPtr();
  SubscriptionRecPtr subRecPtr;
  SubscriptionRecord* subRec;
  subRecPtr.i = subTableData->subscriberData;
  if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
    printf("Trix::execSUB_TABLE_DATA: Failed to find subscription data %u\n", subRecPtr.i);
    return;
  }
  subRecPtr.p = subRec;
  SegmentedSectionPtr headerPtr, dataPtr;
  if (!signal->getSection(headerPtr, 0)) {
    printf("Trix::execSUB_TABLE_DATA: Failed to get header section\n");
  }
  if (!signal->getSection(dataPtr, 1)) {
    printf("Trix::execSUB_TABLE_DATA: Failed to get data section\n");
  }
  executeInsertTransaction(signal, subRecPtr, headerPtr, dataPtr);
}

void Trix::setupSubscription(Signal* signal, SubscriptionRecPtr subRecPtr)
{
  Uint32 attributeList[MAX_ATTRIBUTES_IN_TABLE * 2];
  SubCreateReq * subCreateReq = (SubCreateReq *)signal->getDataPtrSend();
  SubscriptionRecord* subRec = subRecPtr.p;
//  Uint32 listLen = subRec->noOfIndexColumns + subRec->noOfKeyColumns;
  AttrOrderBuffer::DataBufferIterator iter;
  Uint32 i = 0;

  jam();
  bool moreAttributes = subRec->attributeOrder.first(iter);
  while (moreAttributes) {
    attributeList[i++] = *iter.data;
    moreAttributes = subRec->attributeOrder.next(iter);
  }
  // Merge index and key column segments
  struct LinearSectionPtr orderPtr[3];
  orderPtr[0].p = attributeList;
  orderPtr[0].sz = subRec->attributeOrder.getSize();


  subCreateReq->subscriberRef = reference();
  subCreateReq->subscriberData = subRecPtr.i;
  subCreateReq->subscriptionId = subRec->subscriptionId;
  subCreateReq->subscriptionKey = subRec->subscriptionKey;
  subCreateReq->tableId = subRec->sourceTableId;
  subCreateReq->subscriptionType = SubCreateReq::SingleTableScan;
  
  sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ, 
	     signal, SubCreateReq::SignalLength, JBB, orderPtr, 1);
}

void Trix::setupTableScan(Signal* signal, SubscriptionRecPtr subRecPtr)
{
  SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();

  jam();
  subSyncReq->subscriptionId = subRecPtr.i;
  subSyncReq->subscriptionKey = subRecPtr.p->subscriptionKey;
  subSyncReq->part = SubscriptionData::MetaData;
  sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, 
	     signal, SubSyncReq::SignalLength, JBB);
}

void Trix::startTableScan(Signal* signal, SubscriptionRecPtr subRecPtr)
{
  jam();
  subRecPtr.p->expectedConf = 1;
  SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();

  subSyncReq->subscriptionId = subRecPtr.i;
  subSyncReq->subscriptionKey = subRecPtr.p->subscriptionKey;
  subSyncReq->part = SubscriptionData::TableData;
  sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, 
	     signal, SubSyncReq::SignalLength, JBB);
}

void Trix::prepareInsertTransactions(Signal* signal, 
				     SubscriptionRecPtr subRecPtr)
{
  SubscriptionRecord* subRec = subRecPtr.p;
  UtilPrepareReq * utilPrepareReq = 
    (UtilPrepareReq *)signal->getDataPtrSend();
  
  jam();
  utilPrepareReq->senderRef = reference();
  utilPrepareReq->senderData = subRecPtr.i;

  const Uint32 pageSizeInWords = 128;
  Uint32 propPage[pageSizeInWords];
  LinearWriter w(&propPage[0],128);
  w.first();
  w.add(UtilPrepareReq::NoOfOperations, 1);
  w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write);
  w.add(UtilPrepareReq::TableId, subRec->targetTableId);
  // Add index attributes in increasing order and one PK attribute
  for(Uint32 i = 0; i < subRec->noOfIndexColumns + 1; i++)
    w.add(UtilPrepareReq::AttributeId, i);

#if 0
  // Debugging
  SimplePropertiesLinearReader reader(propPage, w.getWordsUsed());
  printf("Trix::prepareInsertTransactions: Sent SimpleProperties:\n");
  reader.printAll(ndbout);
#endif

  struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections];
  sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
  sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
  sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
	     UtilPrepareReq::SignalLength, JBB, 
	     sectionsPtr, UtilPrepareReq::NoOfSections);
}

void Trix::executeInsertTransaction(Signal* signal, 
				    SubscriptionRecPtr subRecPtr,
				    SegmentedSectionPtr headerPtr,
				    SegmentedSectionPtr dataPtr)
{
  jam();
  SubscriptionRecord* subRec = subRecPtr.p;
  UtilExecuteReq * utilExecuteReq = 
    (UtilExecuteReq *)signal->getDataPtrSend();
  Uint32* headerBuffer = signal->theData + 25;
  Uint32*  dataBuffer = headerBuffer + headerPtr.sz;

  utilExecuteReq->senderRef = reference();
  utilExecuteReq->senderData = subRecPtr.i;
  utilExecuteReq->prepareId = subRec->prepareId;
#if 0
  printf("Header size %u\n", headerPtr.sz);
  for(int i = 0; i < headerPtr.sz; i++)
    printf("H'%.8x ", headerBuffer[i]);
  printf("\n");
  
  printf("Data size %u\n", dataPtr.sz);
  for(int i = 0; i < dataPtr.sz; i++)
    printf("H'%.8x ", dataBuffer[i]);
  printf("\n");
#endif
  // Save scan result in linear buffers
  copy(headerBuffer, headerPtr);
  copy(dataBuffer, dataPtr);

  // Calculate packed key size
  Uint32 noOfKeyData = 0;
  for(Uint32 i = 0; i < headerPtr.sz; i++) {
    AttributeHeader* keyAttrHead = (AttributeHeader *) headerBuffer + i;

    // Filter out NULL attributes
    if (keyAttrHead->isNULL())
      return;

    if (i < subRec->noOfIndexColumns)
      // Renumber index attributes in consequtive order
      keyAttrHead->setAttributeId(i);
    else
      // Calculate total size of PK attribute
      noOfKeyData += keyAttrHead->getDataSize();
  }
  // Increase expected CONF count
  subRec->expectedConf++;

  // Pack key attributes
  AttributeHeader::init(headerBuffer + subRec->noOfIndexColumns,
			subRec->noOfIndexColumns,
			noOfKeyData);

  struct LinearSectionPtr sectionsPtr[UtilExecuteReq::NoOfSections];
  sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerBuffer;
  sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz = 
    subRec->noOfIndexColumns + 1;
  sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataBuffer;
  sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz;
  sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
	     UtilExecuteReq::SignalLength, JBB,
	     sectionsPtr, UtilExecuteReq::NoOfSections);
}

void Trix::buildComplete(Signal* signal, SubscriptionRecPtr subRecPtr)
{
  SubRemoveReq * const req = (SubRemoveReq*)signal->getDataPtrSend();
  req->senderRef       = reference();
  req->senderData      = subRecPtr.i;
  req->subscriptionId  = subRecPtr.p->subscriptionId;
  req->subscriptionKey = subRecPtr.p->subscriptionKey;
  sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal,
	     SubRemoveReq::SignalLength, JBB);
}

void Trix::buildFailed(Signal* signal, 
		       SubscriptionRecPtr subRecPtr, 
		       BuildIndxRef::ErrorCode errorCode)
{
  SubscriptionRecord* subRec = subRecPtr.p;

  subRec->errorCode = errorCode;
  // Continue accumulating since we currently cannot stop SUMA
  subRec->expectedConf--;
  checkParallelism(signal, subRec);
  if (subRec->expectedConf == 0)
    buildComplete(signal, subRecPtr);
}

void
Trix::execSUB_REMOVE_REF(Signal* signal){
  jamEntry();
  //@todo
  ndbrequire(false);
}

void
Trix::execSUB_REMOVE_CONF(Signal* signal){
  jamEntry();

  SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();

  SubscriptionRecPtr subRecPtr;
  c_theSubscriptions.getPtr(subRecPtr, conf->senderData);

  if(subRecPtr.p->prepareId != RNIL){
    jam();

    UtilReleaseReq * const req = (UtilReleaseReq*)signal->getDataPtrSend();
    req->prepareId = subRecPtr.p->prepareId;
    req->senderData = subRecPtr.i;

    sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal, 
	       UtilReleaseReq::SignalLength , JBB);  
    return;
  }
  
  {
    UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();
    conf->senderData = subRecPtr.i;
    execUTIL_RELEASE_CONF(signal);
  }
}

void
Trix::execUTIL_RELEASE_REF(Signal* signal){
  jamEntry();
  ndbrequire(false);
}

void
Trix::execUTIL_RELEASE_CONF(Signal* signal){
  
  UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();
  
  SubscriptionRecPtr subRecPtr;
  c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
  
  if(subRecPtr.p->errorCode == BuildIndxRef::NoError){
    // Build is complete, reply to original sender
    BuildIndxConf * buildIndxConf = (BuildIndxConf *)signal->getDataPtrSend();
    buildIndxConf->setUserRef(subRecPtr.p->userReference);
    buildIndxConf->setConnectionPtr(subRecPtr.p->connectionPtr);
    buildIndxConf->setRequestType(BuildIndxReq::RT_TRIX);
    buildIndxConf->setIndexType(subRecPtr.p->indexType);
    buildIndxConf->setTableId(subRecPtr.p->sourceTableId);
    buildIndxConf->setIndexId(subRecPtr.p->targetTableId);
    
    sendSignal(subRecPtr.p->userReference, GSN_BUILDINDXCONF, signal, 
	       BuildIndxConf::SignalLength , JBB);  
  } else {
    // Build failed, reply to original sender
    BuildIndxRef * buildIndxRef = (BuildIndxRef *)signal->getDataPtrSend();
    buildIndxRef->setUserRef(subRecPtr.p->userReference);
    buildIndxRef->setConnectionPtr(subRecPtr.p->connectionPtr);
    buildIndxRef->setRequestType(BuildIndxReq::RT_TRIX);
    buildIndxRef->setIndexType(subRecPtr.p->indexType);
    buildIndxRef->setTableId(subRecPtr.p->sourceTableId);
    buildIndxRef->setIndexId(subRecPtr.p->targetTableId);
    buildIndxRef->setErrorCode(subRecPtr.p->errorCode);
    
    sendSignal(subRecPtr.p->userReference, GSN_BUILDINDXREF, signal, 
	       BuildIndxRef::SignalLength , JBB);  
  }

  // Release subscription record
  subRecPtr.p->attributeOrder.release();
  c_theSubscriptions.release(subRecPtr.i);
}

void Trix::checkParallelism(Signal* signal, SubscriptionRecord* subRec)
{
  if ((subRec->pendingSubSyncContinueConf) &&
      (subRec->expectedConf < subRec->parallelism)) {
    SubSyncContinueConf  * subSyncContinueConf = 
      (SubSyncContinueConf *) signal->getDataPtrSend();
    subSyncContinueConf->subscriptionId = subRec->subscriptionId;
    subSyncContinueConf->subscriptionKey = subRec->subscriptionKey;
    sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal, 
	       SubSyncContinueConf::SignalLength , JBB);  
    subRec->pendingSubSyncContinueConf = false;
  }
}

BLOCK_FUNCTIONS(Trix)

template void append(DataBuffer<15>&,SegmentedSectionPtr,SectionSegmentPool&);