/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "Suma.hpp"

#include <ndb_version.h>

#include <NdbTCP.h>
#include <Bitmask.hpp>
#include <SimpleProperties.hpp>

#include <signaldata/NodeFailRep.hpp>
#include <signaldata/ReadNodesConf.hpp>

#include <signaldata/ListTables.hpp>
#include <signaldata/GetTabInfo.hpp>
#include <signaldata/GetTableId.hpp>
#include <signaldata/DictTabInfo.hpp>
#include <signaldata/SumaImpl.hpp>
#include <signaldata/ScanFrag.hpp>
#include <signaldata/TransIdAI.hpp>
#include <signaldata/CreateTrig.hpp>
#include <signaldata/AlterTrig.hpp>
#include <signaldata/DropTrig.hpp>
#include <signaldata/FireTrigOrd.hpp>
#include <signaldata/TrigAttrInfo.hpp>
#include <signaldata/CheckNodeGroups.hpp>
#include <signaldata/GCPSave.hpp>
#include <GrepError.hpp>

#include <DebuggerNames.hpp>

//#define HANDOVER_DEBUG
//#define NODEFAIL_DEBUG
//#define NODEFAIL_DEBUG2
//#define DEBUG_SUMA_SEQUENCE
//#define EVENT_DEBUG
//#define EVENT_PH3_DEBUG
//#define EVENT_DEBUG2

/**
 * @todo:
 * SUMA crashes if an index is created at the same time as
 * global replication. Very easy to reproduce using testIndex.
 * Note: This only happens occasionally, but is quite easy to reprod.
 */

Uint32 g_subPtrI = RNIL;
static const Uint32 SUMA_SEQUENCE = 0xBABEBABE;


/**************************************************************
 *
 * Start of suma
 *
 */

#define PRINT_ONLY 0
static Uint32 g_TypeOfStart = NodeState::ST_ILLEGAL_TYPE;

void
Suma::getNodeGroupMembers(Signal* signal) {
  jam();
  /**
   * Ask DIH for nodeGroupMembers
   */
  CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend();
  sd->blockRef = reference();
  sd->requestType =
    CheckNodeGroups::Direct |
    CheckNodeGroups::GetNodeGroupMembers;
  sd->nodeId = getOwnNodeId();
  EXECUTE_DIRECT(DBDIH, GSN_CHECKNODEGROUPSREQ, signal, 
		 CheckNodeGroups::SignalLength);
  jamEntry();
  
  c_nodeGroup = sd->output;
  c_noNodesInGroup = 0;
  for (int i = 0; i < MAX_NDB_NODES; i++) {
    if (sd->mask.get(i)) {
      if (i == getOwnNodeId()) c_idInNodeGroup = c_noNodesInGroup;
      c_nodesInGroup[c_noNodesInGroup] = i;
      c_noNodesInGroup++;
    }
  }

  //  ndbout_c("c_noNodesInGroup=%d", c_noNodesInGroup);
  ndbrequire(c_noNodesInGroup > 0); // at least 1 node in the nodegroup

#ifdef NODEFAIL_DEBUG
  for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
    ndbout_c ("Suma: NodeGroup %u, me %u, me in group %u, member[%u] %u",
	      c_nodeGroup, getOwnNodeId(), c_idInNodeGroup,
	      i, c_nodesInGroup[i]);
  }
#endif
}

void
Suma::execSTTOR(Signal* signal) {
  jamEntry();                            
  
  const Uint32 startphase  = signal->theData[1];
  const Uint32 typeOfStart = signal->theData[7];

#ifdef NODEFAIL_DEBUG
  ndbout_c ("SUMA::execSTTOR startphase = %u, typeOfStart = %u",
	    startphase, typeOfStart);

#endif

  if(startphase == 1){
    jam();
    c_restartLock = true;
  }

  if(startphase == 3){
    jam();
    g_TypeOfStart = typeOfStart;
    signal->theData[0] = reference();
    sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);

#if 0

    /**
     * Debug
     */

    
    SubscriptionPtr subPtr;
    Ptr<SyncRecord> syncPtr;
    ndbrequire(c_subscriptions.seize(subPtr));
    ndbrequire(c_syncPool.seize(syncPtr));
    

    ndbout_c("Suma: subPtr.i = %d syncPtr.i = %d", subPtr.i, syncPtr.i);

    subPtr.p->m_syncPtrI = syncPtr.i;
    subPtr.p->m_subscriptionType = SubCreateReq::DatabaseSnapshot;
    syncPtr.p->m_subscriptionPtrI = subPtr.i;
    syncPtr.p->ptrI = syncPtr.i;
    g_subPtrI = subPtr.i;
    //    sendSTTORRY(signal);
#endif    
    return;
  }

  if(startphase == 5) {
    getNodeGroupMembers(signal);
    if (g_TypeOfStart == NodeState::ST_NODE_RESTART) {
      jam();
      for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
	Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]);
	if (ref != reference())
	  sendSignal(ref, GSN_SUMA_START_ME, signal,
		     1 /*SumaStartMe::SignalLength*/, JBB);
      }
    }
  }
  
  if(startphase == 7) {
    c_restartLock = false; // may be set false earlier with HANDOVER_REQ
    
    if (g_TypeOfStart != NodeState::ST_NODE_RESTART) {
      for( int i = 0; i < NO_OF_BUCKETS; i++) {
	if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
	  // I'm running this bucket
#ifdef EVENT_DEBUG
	  ndbout_c("bucket %u set to true", i);
#endif
	  c_buckets[i].active = true;
	}
      }
    }

    if(g_TypeOfStart == NodeState::ST_INITIAL_START &&
       c_masterNodeId == getOwnNodeId()) {
      jam();
      createSequence(signal);
      return;
    }//if
  }//if
  

  sendSTTORRY(signal);
  
  return;
}

void
Suma::createSequence(Signal* signal)
{
  jam();

  UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend();
  
  req->senderData  = RNIL;
  req->sequenceId  = SUMA_SEQUENCE;
  req->requestType = UtilSequenceReq::Create;
#ifdef DEBUG_SUMA_SEQUENCE
  ndbout_c("SUMA: Create sequence");
#endif
  sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, 
	     signal, UtilSequenceReq::SignalLength, JBB);
  // execUTIL_SEQUENCE_CONF will call createSequenceReply()
}

void
Suma::createSequenceReply(Signal* signal,
			  UtilSequenceConf * conf,
			  UtilSequenceRef * ref)
{
  jam();

  if (ref != NULL)
    ndbrequire(false);

  sendSTTORRY(signal);
}

void
Suma::execREAD_NODESCONF(Signal* signal){
  jamEntry();
  ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr();
 
  c_aliveNodes.clear();
  c_preparingNodes.clear();

  Uint32 count = 0;
  for(Uint32 i = 0; i < MAX_NDB_NODES; i++){
    if(NodeBitmask::get(conf->allNodes, i)){
      jam();
      
      count++;

      NodePtr node;
      ndbrequire(c_nodes.seize(node));
      
      node.p->nodeId = i;
      if(NodeBitmask::get(conf->inactiveNodes, i)){
	jam();
	node.p->alive = 0;
      } else {
	jam();
	node.p->alive = 1;
	c_aliveNodes.set(i);
      }
    } else
      jam();
  }
  c_masterNodeId = conf->masterNodeId;
  ndbrequire(count == conf->noOfNodes);

  sendSTTORRY(signal);
}

#if 0
void
Suma::execREAD_CONFIG_REQ(Signal* signal) 
{
  const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
  Uint32 ref = req->senderRef;
  Uint32 senderData = req->senderData;
  ndbrequire(req->noOfParameters == 0);

  jamEntry();

  const ndb_mgm_configuration_iterator * p = 
    theConfiguration.getOwnConfigIterator();
  ndbrequire(p != 0);
  
  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_NO_REDOLOG_FILES, 
					&cnoLogFiles));
  ndbrequire(cnoLogFiles > 0);

  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_FRAG, &cfragrecFileSize));
  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_TABLE, &ctabrecFileSize));
  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_TC_CONNECT, 
					&ctcConnectrecFileSize));
  clogFileFileSize       = 4 * cnoLogFiles;
  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_SCAN, &cscanrecFileSize));
  cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_SCANS_PER_FRAG;

  initRecords();
  initialiseRecordsLab(signal, 0, ref, senderData);
  
  return;
}//Dblqh::execSIZEALT_REP()
#endif

void
Suma::sendSTTORRY(Signal* signal){
  signal->theData[0] = 0;
  signal->theData[3] = 1;
  signal->theData[4] = 3;
  signal->theData[5] = 5;
  signal->theData[6] = 7;
  signal->theData[7] = 255; // No more start phases from missra
  sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 8, JBB);
}

void
Suma::execNDB_STTOR(Signal* signal) 
{
  jamEntry();                            
}

void
Suma::execCONTINUEB(Signal* signal){
  jamEntry();
}

void
SumaParticipant::execCONTINUEB(Signal* signal) 
{
  jamEntry();
}

/*****************************************************************************
 * 
 * Node state handling
 *
 *****************************************************************************/

void Suma::execAPI_FAILREQ(Signal* signal) 
{
  jamEntry();
  Uint32 failedApiNode = signal->theData[0];
  //BlockReference retRef = signal->theData[1];

  c_failedApiNodes.set(failedApiNode);
  bool found = removeSubscribersOnNode(signal, failedApiNode);

  if(!found){
    jam();
    c_failedApiNodes.clear(failedApiNode);
  }
}//execAPI_FAILREQ()

bool
SumaParticipant::removeSubscribersOnNode(Signal *signal, Uint32 nodeId)
{
  bool found = false;

  SubscriberPtr i_subbPtr;
  c_dataSubscribers.first(i_subbPtr);
  while(!i_subbPtr.isNull()){
    SubscriberPtr subbPtr = i_subbPtr;
    c_dataSubscribers.next(i_subbPtr);
    jam();
    if (refToNode(subbPtr.p->m_subscriberRef) == nodeId) {
      jam();
      c_dataSubscribers.remove(subbPtr);
      c_removeDataSubscribers.add(subbPtr);
      found = true;
    }
  }
  if(found){
    jam();
    sendSubStopReq(signal);
  }
  return found;
}

void
SumaParticipant::sendSubStopReq(Signal *signal){
  static bool remove_lock = false;
  jam();

  if(remove_lock) {
    jam();
    return;
  }
  remove_lock = true;

  SubscriberPtr subbPtr;
  c_removeDataSubscribers.first(subbPtr);
  if (subbPtr.isNull()){
    jam();
#if 0
    signal->theData[0] = failedApiNode;
    signal->theData[1] = reference();
    sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB);
#endif
    c_failedApiNodes.clear();

    remove_lock = false;
    return;
  }

  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);

  SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend();
  req->senderRef       = reference();
  req->senderData      = subbPtr.i;
  req->subscriberRef   = subbPtr.p->m_subscriberRef;
  req->subscriberData  = subbPtr.p->m_subscriberData;
  req->subscriptionId  = subPtr.p->m_subscriptionId;
  req->subscriptionKey = subPtr.p->m_subscriptionKey;
  req->part = SubscriptionData::TableData;

  sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB);
}

void
SumaParticipant::execSUB_STOP_CONF(Signal* signal){
  jamEntry();

  SubStopConf * const conf = (SubStopConf*)signal->getDataPtr();

  //  Uint32 subscriberData = conf->subscriberData;
  //  Uint32 subscriberRef = conf->subscriberRef;

  Subscription key; 
  key.m_subscriptionId = conf->subscriptionId;
  key.m_subscriptionKey = conf->subscriptionKey;

  SubscriptionPtr subPtr;
  if(c_subscriptions.find(subPtr, key)) {
    jam();
    if (subPtr.p->m_markRemove) {
      jam();
      ndbrequire(false);
      ndbrequire(subPtr.p->m_nSubscribers > 0);
      subPtr.p->m_nSubscribers--;
      if (subPtr.p->m_nSubscribers == 0){
	jam();
	completeSubRemoveReq(signal, subPtr);
      }
    }
  }

  sendSubStopReq(signal);
}

void
SumaParticipant::execSUB_STOP_REF(Signal* signal){
  jamEntry();
  SubStopRef * const ref = (SubStopRef*)signal->getDataPtr();

  Uint32 subscriptionId = ref->subscriptionId;
  Uint32 subscriptionKey = ref->subscriptionKey;
  Uint32 part = ref->part;
  Uint32 subscriberData = ref->subscriberData;
  Uint32 subscriberRef = ref->subscriberRef;
  //  Uint32 err = ref->err;

  if(!ref->isTemporary()){
    ndbrequire(false);
  }

  SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend();
  req->subscriberRef = subscriberRef;
  req->subscriberData = subscriberData;
  req->subscriptionId = subscriptionId;
  req->subscriptionKey = subscriptionKey;
  req->part = part;

  sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB);
}

void
Suma::execNODE_FAILREP(Signal* signal){
  jamEntry();

  NodeFailRep * const rep = (NodeFailRep*)signal->getDataPtr();
  
  bool changed = false;

  NodePtr nodePtr;
#ifdef NODEFAIL_DEBUG
  ndbout_c("Suma: nodefailrep");
#endif
  c_nodeFailGCI = getFirstGCI(signal);

  for(c_nodes.first(nodePtr); nodePtr.i != RNIL; c_nodes.next(nodePtr)){
    if(NodeBitmask::get(rep->theNodes, nodePtr.p->nodeId)){
      if(nodePtr.p->alive){
	ndbassert(c_aliveNodes.get(nodePtr.p->nodeId));
	changed = true;
	jam();
      } else {
	ndbassert(!c_aliveNodes.get(nodePtr.p->nodeId));
	jam();
      }
      
      if (c_preparingNodes.get(nodePtr.p->nodeId)) {
	jam();
	// we are currently preparing this node that died
	// it's ok just to clear and go back to waiting for it to start up
	Restart.resetNode(calcSumaBlockRef(nodePtr.p->nodeId));
	c_preparingNodes.clear(nodePtr.p->nodeId);
      } else if (c_handoverToDo) {
	jam();
	// TODO what if I'm a SUMA that is currently restarting and the SUMA
	// responsible for restarting me is the one that died?

	// a node has failed whilst handover is going on
	// let's check if we're in the process of handover with that node
	c_handoverToDo = false;
	for( int i = 0; i < NO_OF_BUCKETS; i++) {
	  if (c_buckets[i].handover) {
	    // I'm doing handover, but is it with the dead node?
	    if (getResponsibleSumaNodeId(i) == nodePtr.p->nodeId) {
	      // so it was the dead node, has handover started?
	      if (c_buckets[i].handover_started) {
		jam();
		// we're not ok and will have lost data!
		// set not active to indicate this -
		// this will generate takeover behaviour
		c_buckets[i].active = false;
		c_buckets[i].handover_started = false;
	      } // else we're ok to revert back to state before 
	      c_buckets[i].handover = false;
	    } else {
	      jam();
	      // ok, we're doing handover with a different node
	      c_handoverToDo = true;
	    }
	  }
	}
      }

      c_failoverBuffer.nodeFailRep();

      nodePtr.p->alive = 0;
      c_aliveNodes.clear(nodePtr.p->nodeId); // this has to be done after the loop above
    }
  }
}

void
Suma::execINCL_NODEREQ(Signal* signal){
  jamEntry();
  
  //const Uint32 senderRef = signal->theData[0];
  const Uint32 inclNode  = signal->theData[1];

  NodePtr node;
  for(c_nodes.first(node); node.i != RNIL; c_nodes.next(node)){
    jam();
    const Uint32 nodeId = node.p->nodeId;
    if(inclNode == nodeId){
      jam();
      
      ndbrequire(node.p->alive == 0);
      ndbrequire(!c_aliveNodes.get(nodeId));
      
      for (Uint32 j = 0; j < c_noNodesInGroup; j++) {
        jam();
	if (c_nodesInGroup[j] == nodeId) {
	  // the starting node is part of my node group
          jam();
	  c_preparingNodes.set(nodeId); // set as being prepared
	  for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
            jam();
	    if (i == c_idInNodeGroup) {
              jam();
	      // I'm responsible for restarting this SUMA
	      // ALL dict's should have meta data info so it is ok to start
	      Restart.startNode(signal, calcSumaBlockRef(nodeId));
	      break;
	    }//if
	    if (c_aliveNodes.get(c_nodesInGroup[i])) {
              jam();
	      break; // another Suma takes care of this
	    }//if
	  }//for
	  break;
	}//if
      }//for

      node.p->alive = 1;
      c_aliveNodes.set(nodeId);

      break;
    }//if
  }//for

#if 0 // if we include this DIH's got to be prepared, later if needed...
  signal->theData[0] = reference();
  
  sendSignal(senderRef, GSN_INCL_NODECONF, signal, 1, JBB);
#endif
}

void
Suma::execSIGNAL_DROPPED_REP(Signal* signal){
  jamEntry();
  ndbrequire(false);
}

/********************************************************************
 *
 * Dump state
 *
 */

void
Suma::execDUMP_STATE_ORD(Signal* signal){
  jamEntry();

  Uint32 tCase = signal->theData[0];
  if(tCase >= 8000 && tCase <= 8003){
    SubscriptionPtr subPtr;
    c_subscriptions.getPtr(subPtr, g_subPtrI);
    
    Ptr<SyncRecord> syncPtr;
    c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
    
    if(tCase == 8000){
      syncPtr.p->startMeta(signal);
    }
    
    if(tCase == 8001){
      syncPtr.p->startScan(signal);
    }

    if(tCase == 8002){
      syncPtr.p->startTrigger(signal);
    }
    
    if(tCase == 8003){
      subPtr.p->m_subscriptionType = SubCreateReq::SingleTableScan;
      LocalDataBuffer<15> attrs(c_dataBufferPool, syncPtr.p->m_attributeList);
      Uint32 tab = 0;
      Uint32 att[] = { 0, 1, 1 };
      syncPtr.p->m_tableList.append(&tab, 1);
      attrs.append(att, 3);
    }
  }

  if(tCase == 8004){
    infoEvent("Suma: c_subscriberPool  size: %d free: %d",
	      c_subscriberPool.getSize(),
	      c_subscriberPool.getNoOfFree());

    infoEvent("Suma: c_tablePool  size: %d free: %d",
	      c_tablePool_.getSize(),
	      c_tablePool_.getNoOfFree());

    infoEvent("Suma: c_subscriptionPool  size: %d free: %d",
	      c_subscriptionPool.getSize(),
	      c_subscriptionPool.getNoOfFree());

    infoEvent("Suma: c_syncPool  size: %d free: %d",
	      c_syncPool.getSize(),
	      c_syncPool.getNoOfFree());

    infoEvent("Suma: c_dataBufferPool  size: %d free: %d",
	      c_dataBufferPool.getSize(),
	      c_dataBufferPool.getNoOfFree());
  }
}

/********************************************************************
 *
 * Convert a table name (db+schema+tablename) to tableId
 *
 */

#if 0
void
SumaParticipant::convertNameToId(SubscriptionPtr subPtr, Signal * signal)
{
  jam();
  if(subPtr.p->m_currentTable < subPtr.p->m_maxTables) {
    jam();

    GetTableIdReq * req = (GetTableIdReq *)signal->getDataPtrSend();
    char * tableName = subPtr.p->m_tableNames[subPtr.p->m_currentTable];
    const Uint32 strLen = strlen(tableName) + 1; // NULL Terminated
    req->senderRef  = reference();
    req->senderData = subPtr.i;
    req->len        = strLen;

    LinearSectionPtr ptr[1];
    ptr[0].p  = (Uint32*)tableName;
    ptr[0].sz = strLen;

    sendSignal(DBDICT_REF,
	       GSN_GET_TABLEID_REQ, 
	       signal, 
	       GetTableIdReq::SignalLength,
	       JBB,
	       ptr,
	       1);
  } else {
    jam();
    sendSubCreateConf(signal, subPtr.p->m_subscriberRef, subPtr);
  }
}
#endif


void 
SumaParticipant::addTableId(Uint32 tableId,
			    SubscriptionPtr subPtr, SyncRecord *psyncRec)
{
#ifdef NODEFAIL_DEBUG
  ndbout_c("SumaParticipant::addTableId(%u,%u,%u), current_table=%u",
	   tableId, subPtr.i, psyncRec, subPtr.p->m_currentTable);
#endif
  subPtr.p->m_tables[tableId] = 1;
  subPtr.p->m_currentTable++;
  if(psyncRec != NULL)
    psyncRec->m_tableList.append(&tableId, 1);  
}

#if 0
void 
SumaParticipant::execGET_TABLEID_CONF(Signal * signal)
{
  jamEntry();

  GetTableIdConf* conf = (GetTableIdConf *)signal->getDataPtr();
  Uint32 tableId = conf->tableId;
  //Uint32 schemaVersion = conf->schemaVersion;  
  Uint32 senderData = conf->senderData;

  SubscriptionPtr subPtr;
  Ptr<SyncRecord> syncPtr;

  c_subscriptions.getPtr(subPtr, senderData);
  c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);  

  /*
   * add to m_tableList
   */
  addTableId(tableId, subPtr, syncPtr.p);

  convertNameToId(subPtr, signal);
}

void 
SumaParticipant::execGET_TABLEID_REF(Signal * signal)
{
  jamEntry();
  GetTableIdRef const * ref = (GetTableIdRef *)signal->getDataPtr();
  Uint32 senderData         = ref->senderData;
  //  Uint32 err                = ref->err;
  
  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr, senderData);
  Uint32 subData = subPtr.p->m_subscriberData;
  SubCreateRef * reff = (SubCreateRef*)ref;
  /**
   * @todo: map ref->err to GrepError.
   */
  reff->err = GrepError::SELECTED_TABLE_NOT_FOUND;
  reff->subscriberData = subData;
  sendSignal(subPtr.p->m_subscriberRef,
	     GSN_SUB_CREATE_REF, 
	     signal, 
	     SubCreateRef::SignalLength,
	     JBB);
}
#endif


/*************************************************************
 *
 * Creation of subscription id's
 *
 ************************************************************/

void 
Suma::execCREATE_SUBID_REQ(Signal* signal) 
{
  jamEntry();

  CRASH_INSERTION(13001);

  CreateSubscriptionIdReq const * req =
    (CreateSubscriptionIdReq*)signal->getDataPtr();
  SubscriberPtr subbPtr;
  if(!c_subscriberPool.seize(subbPtr)){
    jam();
    sendSubIdRef(signal, GrepError::SUBSCRIPTION_ID_NOMEM);
    return;
  }

  subbPtr.p->m_subscriberRef  = signal->getSendersBlockRef(); 
  subbPtr.p->m_senderData     = req->senderData;
  subbPtr.p->m_subscriberData = subbPtr.i;

  UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend();
   
  utilReq->senderData  = subbPtr.p->m_subscriberData;
  utilReq->sequenceId  = SUMA_SEQUENCE;
  utilReq->requestType = UtilSequenceReq::NextVal;
  sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, 
	     signal, UtilSequenceReq::SignalLength, JBB);
}

void
Suma::execUTIL_SEQUENCE_CONF(Signal* signal)
{
  jamEntry();

  CRASH_INSERTION(13002);

  UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr();
#ifdef DEBUG_SUMA_SEQUENCE
  ndbout_c("SUMA: Create sequence conf");
#endif
  if(conf->requestType == UtilSequenceReq::Create) {
    jam();
    createSequenceReply(signal, conf, NULL);
    return;
  }

  Uint32 subId = conf->sequenceValue[0];
  Uint32 subData = conf->senderData;

  SubscriberPtr subbPtr;
  c_subscriberPool.getPtr(subbPtr,subData);
  

  CreateSubscriptionIdConf * subconf = (CreateSubscriptionIdConf*)conf;
  subconf->subscriptionId = subId;
  subconf->subscriptionKey =(getOwnNodeId() << 16) | (subId & 0xFFFF);
  subconf->subscriberData = subbPtr.p->m_senderData;
  
  sendSignal(subbPtr.p->m_subscriberRef, GSN_CREATE_SUBID_CONF, signal,
	     CreateSubscriptionIdConf::SignalLength, JBB);

  c_subscriberPool.release(subbPtr);
}

void
Suma::execUTIL_SEQUENCE_REF(Signal* signal)
{
  jamEntry();
  UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr();

  if(ref->requestType == UtilSequenceReq::Create) {
    jam();
    createSequenceReply(signal, NULL, ref);
    return;
  }

  Uint32 subData = ref->senderData;

  SubscriberPtr subbPtr;
  c_subscriberPool.getPtr(subbPtr,subData);
  sendSubIdRef(signal, GrepError::SEQUENCE_ERROR);
  c_subscriberPool.release(subbPtr);
  return;
}//execUTIL_SEQUENCE_REF()


void
SumaParticipant::sendSubIdRef(Signal* signal, Uint32 errCode){
  jam();
  CreateSubscriptionIdRef  * ref = 
    (CreateSubscriptionIdRef *)signal->getDataPtrSend();

  ref->err = errCode;
  sendSignal(signal->getSendersBlockRef(), 
	     GSN_CREATE_SUBID_REF,
	     signal, 
	     CreateSubscriptionIdRef::SignalLength,
	     JBB);
  
  releaseSections(signal);  
  return;
}

/**********************************************************
 * Suma participant interface
 *
 * Creation of subscriptions
 */

void
SumaParticipant::execSUB_CREATE_REQ(Signal* signal) {
#ifdef NODEFAIL_DEBUG
  ndbout_c("SumaParticipant::execSUB_CREATE_REQ");
#endif
  jamEntry();                            

  CRASH_INSERTION(13003);

  const SubCreateReq req = *(SubCreateReq*)signal->getDataPtr();    
  
  const Uint32 subId   = req.subscriptionId;
  const Uint32 subKey  = req.subscriptionKey;
  const Uint32 subRef  = req.subscriberRef;
  const Uint32 subData = req.subscriberData;
  const Uint32 type    = req.subscriptionType & SubCreateReq::RemoveFlags;
  const Uint32 flags   = req.subscriptionType & SubCreateReq::GetFlags;
  const bool addTableFlag = (flags & SubCreateReq::AddTableFlag) != 0;
  const bool restartFlag  = (flags & SubCreateReq::RestartFlag)  != 0;

  const Uint32 sender = signal->getSendersBlockRef();

  Subscription key;
  key.m_subscriptionId  = subId;
  key.m_subscriptionKey = subKey;

  SubscriptionPtr subPtr;
  Ptr<SyncRecord> syncPtr;
  
  if (addTableFlag) {
    ndbrequire(restartFlag);  //TODO remove this

    if(!c_subscriptions.find(subPtr, key)) {
      jam();
      sendSubCreateRef(signal, req, GrepError::SUBSCRIPTION_NOT_FOUND);
      return;
    }
    jam();
    c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
  } else {
    // Check that id/key is unique
    if(c_subscriptions.find(subPtr, key)) {
      jam();
      sendSubCreateRef(signal, req, GrepError::SUBSCRIPTION_ID_NOT_UNIQUE);
      return;
    }
    if(!c_subscriptions.seize(subPtr)) {
      jam();
      sendSubCreateRef(signal, req, GrepError::NOSPACE_IN_POOL);
      return;
    }
    if(!c_syncPool.seize(syncPtr)) {
      jam();
      sendSubCreateRef(signal, req, GrepError::NOSPACE_IN_POOL);
      return;
    }
    jam();
    subPtr.p->m_subscriberRef    = subRef;
    subPtr.p->m_subscriberData   = subData;
    subPtr.p->m_subscriptionId   = subId;
    subPtr.p->m_subscriptionKey  = subKey;
    subPtr.p->m_subscriptionType = type;
  
    /**
     * ok to memset? Support on all compilers
     * @todo find out if memset is supported by all compilers
     */
    memset(subPtr.p->m_tables,0,MAX_TABLES);
    subPtr.p->m_maxTables    = 0;
    subPtr.p->m_currentTable = 0;
    subPtr.p->m_syncPtrI   = syncPtr.i;
    subPtr.p->m_markRemove = false;
    subPtr.p->m_nSubscribers = 0;

    c_subscriptions.add(subPtr);

    syncPtr.p->m_subscriptionPtrI = subPtr.i;
    syncPtr.p->m_doSendSyncData   = true;
    syncPtr.p->ptrI               = syncPtr.i;
    syncPtr.p->m_locked           = false;
    syncPtr.p->m_error            = false;
  }

  if (restartFlag || 
      type == SubCreateReq::TableEvent) {

    syncPtr.p->m_doSendSyncData = false;

    ndbrequire(type != SubCreateReq::SingleTableScan);
    jam();

    if (subPtr.p->m_tables[req.tableId] != 0) {
      ndbrequire(false); //TODO remove
      jam();
      sendSubCreateRef(signal, req, GrepError::SELECTED_TABLE_ALREADY_ADDED);
      return;
    }
    if (addTableFlag) {
      ndbrequire(type != SubCreateReq::TableEvent);
      jam();
    }
    subPtr.p->m_maxTables++;
    addTableId(req.tableId, subPtr, syncPtr.p);
  } else {
    switch(type){
    case SubCreateReq::SingleTableScan:
      {
	jam();
	syncPtr.p->m_tableList.append(&req.tableId, 1);
	if(signal->getNoOfSections() > 0){
	  SegmentedSectionPtr ptr;
	  signal->getSection(ptr, SubCreateReq::ATTRIBUTE_LIST);
	  LocalDataBuffer<15> attrBuf(c_dataBufferPool,syncPtr.p->m_attributeList);
	  append(attrBuf, ptr, getSectionSegmentPool());
	}
      }
    break;
#if 0
    case SubCreateReq::SelectiveTableSnapshot:
      /**
       * Tables specified by the user that does not exist
       * in the database are just ignored. No error message
       * is given, nor does the db nodes crash
       * @todo: Memory is not release here (used tableBuf)
       */
      {
	if(signal->getNoOfSections() == 0 ){
	  jam();
	  sendSubCreateRef(signal, req, GrepError::WRONG_NO_OF_SECTIONS);
	  return;
	}

	jam();      
	SegmentedSectionPtr ptr;
	signal->getSection(ptr,0);// SubCreateReq::TABLE_LIST);
	SimplePropertiesSectionReader r0(ptr, getSectionSegmentPool());
	Uint32 i=0;
	char table[MAX_TAB_NAME_SIZE];
	r0.reset();
	r0.first();
	while(true){
	  if ((r0.getValueType() != SimpleProperties::StringValue) ||
	      (r0.getValueLen() <= 0)) {
	    releaseSections(signal);
	    ndbrequire(false);
	  }
	  r0.getString(table);
	  strcpy(subPtr.p->m_tableNames[i],table);
	  i++;
	  if(!r0.next())
	    break;
	}
	releaseSections(signal);
	subPtr.p->m_maxTables    = i;
	subPtr.p->m_currentTable = 0;
	releaseSections(signal);
	convertNameToId(subPtr, signal);
	return;
      }
    break;
#endif
    case SubCreateReq::DatabaseSnapshot:
      {
	jam();
      }
    break;
    default:
      ndbrequire(false);
    }
  }

  sendSubCreateConf(signal, sender, subPtr);

  return;
}

void
SumaParticipant::sendSubCreateConf(Signal* signal, Uint32 sender,
				   SubscriptionPtr subPtr)
{
  SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();      
  conf->subscriptionId       = subPtr.p->m_subscriptionId;
  conf->subscriptionKey      = subPtr.p->m_subscriptionKey;
  conf->subscriberData       = subPtr.p->m_subscriberData;
  sendSignal(sender, GSN_SUB_CREATE_CONF, signal,
	     SubCreateConf::SignalLength, JBB);
}

void
SumaParticipant::sendSubCreateRef(Signal* signal, const SubCreateReq& req, Uint32 errCode){
  jam();
  SubCreateRef * ref = (SubCreateRef *)signal->getDataPtrSend();
  ref->subscriberRef  = reference();
  ref->subscriberData = req.subscriberData;
  ref->err = errCode;
  releaseSections(signal);
  sendSignal(signal->getSendersBlockRef(), GSN_SUB_CREATE_REF, signal, 
	     SubCreateRef::SignalLength, JBB);
  return;
}












Uint32
SumaParticipant::getFirstGCI(Signal* signal) {
  if (c_lastCompleteGCI == RNIL) {
    ndbout_c("WARNING: c_lastCompleteGCI == RNIL");
    return 0;
  }
  return c_lastCompleteGCI+3;
}

/**********************************************************
 *
 * Setting upp trigger for subscription
 *
 */

void 
SumaParticipant::execSUB_SYNC_REQ(Signal* signal) {
  jamEntry();

  CRASH_INSERTION(13004);
#ifdef EVENT_PH3_DEBUG
  ndbout_c("SumaParticipant::execSUB_SYNC_REQ");
#endif

  SubSyncReq * const req = (SubSyncReq*)signal->getDataPtr();

  SubscriptionPtr subPtr;
  Subscription key; 
  key.m_subscriptionId = req->subscriptionId;
  key.m_subscriptionKey = req->subscriptionKey;
  
  if(!c_subscriptions.find(subPtr, key)){
    jam();
    sendSubSyncRef(signal, GrepError::SUBSCRIPTION_ID_NOT_FOUND);
    return;
  }

  /**
   * @todo Tomas, do you really need to do this?
   */
  if(subPtr.p->m_subscriptionType == SubCreateReq::TableEvent) {
    jam();
    subPtr.p->m_subscriberData = req->subscriberData;
  }

  bool ok = false;
  SubscriptionData::Part part = (SubscriptionData::Part)req->part;
  
  Ptr<SyncRecord> syncPtr;
  c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
  switch(part){
  case SubscriptionData::MetaData:
    ok = true;
    jam();
    if (subPtr.p->m_subscriptionType == SubCreateReq::DatabaseSnapshot) {
      TableList::DataBufferIterator it;
      syncPtr.p->m_tableList.first(it);
      if(it.isNull()) {
	/**
	 * Get all tables from dict
	 */
	ListTablesReq * req = (ListTablesReq*)signal->getDataPtrSend();
	req->senderRef   = reference();
	req->senderData  = syncPtr.i;
	req->requestData = 0;
	/**
	 * @todo: accomodate scan of index tables?
	 */
	req->setTableType(DictTabInfo::UserTable);

	sendSignal(DBDICT_REF, GSN_LIST_TABLES_REQ, signal, 
		   ListTablesReq::SignalLength, JBB);
	break;
      }
    }

    syncPtr.p->startMeta(signal);
    break;
  case SubscriptionData::TableData: {
    ok = true;
    jam();
    syncPtr.p->startScan(signal);
    break;
  }
  }
  ndbrequire(ok);
}

void
SumaParticipant::sendSubSyncRef(Signal* signal, Uint32 errCode){
  jam();
  SubSyncRef  * ref = 
    (SubSyncRef *)signal->getDataPtrSend();
  ref->err = errCode;
  sendSignal(signal->getSendersBlockRef(), 
	     GSN_SUB_SYNC_REF, 
	     signal, 
	     SubSyncRef::SignalLength,
	     JBB);
	     
  releaseSections(signal);  
  return;
}

/**********************************************************
 * Dict interface
 */

void
SumaParticipant::execLIST_TABLES_CONF(Signal* signal){
  jamEntry();
  CRASH_INSERTION(13005);
  ListTablesConf* const conf = (ListTablesConf*)signal->getDataPtr();
  SyncRecord* tmp = c_syncPool.getPtr(conf->senderData);
  tmp->runLIST_TABLES_CONF(signal);
}


void
SumaParticipant::execGET_TABINFOREF(Signal* signal){
  jamEntry();
  GetTabInfoRef* const ref = (GetTabInfoRef*)signal->getDataPtr();
  SyncRecord* tmp = c_syncPool.getPtr(ref->senderData);
  tmp->runGET_TABINFOREF(signal);
}

void
SumaParticipant::execGET_TABINFO_CONF(Signal* signal){
  jamEntry();

  CRASH_INSERTION(13006);

  if(!assembleFragments(signal)){
    return;
  }
  
  GetTabInfoConf* conf = (GetTabInfoConf*)signal->getDataPtr();
  
  Uint32 tableId = conf->tableId;
  Uint32 senderData = conf->senderData;

  SyncRecord* tmp = c_syncPool.getPtr(senderData);
  ndbrequire(parseTable(signal, conf, tableId, tmp));
  tmp->runGET_TABINFO_CONF(signal);
}

bool
SumaParticipant::parseTable(Signal* signal, GetTabInfoConf* conf, Uint32 tableId,
			    SyncRecord* syncPtr_p){

  SegmentedSectionPtr ptr;
  signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);
  
  SimplePropertiesSectionReader it(ptr, getSectionSegmentPool());
  
  SimpleProperties::UnpackStatus s;
  DictTabInfo::Table tableDesc; tableDesc.init();
  s = SimpleProperties::unpack(it, &tableDesc, 
			       DictTabInfo::TableMapping, 
			       DictTabInfo::TableMappingSize, 
			       true, true);
  
  ndbrequire(s == SimpleProperties::Break);

  TablePtr tabPtr;
  c_tables.find(tabPtr, tableId);
  
  if(!tabPtr.isNull() &&
     tabPtr.p->m_schemaVersion != tableDesc.TableVersion){
    jam();

    tabPtr.p->release(* this);

    // oops wrong schema version in stored tabledesc
    // we need to find all subscriptions with old table desc
    // and all subscribers to this
    // hopefully none
    c_tables.release(tabPtr);
    tabPtr.setNull();
    DLHashTable<SumaParticipant::Subscription>::Iterator i_subPtr;
    c_subscriptions.first(i_subPtr);
    SubscriptionPtr subPtr;
    for(;!i_subPtr.isNull();c_subscriptions.next(i_subPtr)){
      jam();
      c_subscriptions.getPtr(subPtr, i_subPtr.curr.i);
      SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI);
      if (tmp == syncPtr_p) {
	jam();
	continue;
      }
      if (subPtr.p->m_tables[tableId]) {
	jam();
	subPtr.p->m_tables[tableId] = 0; // remove this old table reference
	TableList::DataBufferIterator it;
	for(tmp->m_tableList.first(it);!it.isNull();tmp->m_tableList.next(it)) {
	  jam();
	  if (*it.data == tableId){
	    jam();
	    Uint32 *pdata = it.data;
	    tmp->m_tableList.next(it);
	    for(;!it.isNull();tmp->m_tableList.next(it)) {
	      jam();
	      *pdata = *it.data;
	      pdata = it.data;
	    }
	    *pdata = RNIL; // todo remove this last item...
	    break;
	  }
	}
      }
    }
  }

  if (tabPtr.isNull()) {
    jam();
    /**
     * Uninitialized table record
     */
    ndbrequire(c_tables.seize(tabPtr));
    new (tabPtr.p) Table;
    tabPtr.p->m_schemaVersion = RNIL;
    tabPtr.p->m_tableId = tableId;
    tabPtr.p->m_hasTriggerDefined[0] = 0;
    tabPtr.p->m_hasTriggerDefined[1] = 0;
    tabPtr.p->m_hasTriggerDefined[2] = 0;
    tabPtr.p->m_triggerIds[0] = ILLEGAL_TRIGGER_ID;
    tabPtr.p->m_triggerIds[1] = ILLEGAL_TRIGGER_ID;
    tabPtr.p->m_triggerIds[2] = ILLEGAL_TRIGGER_ID;
#if 0
    ndbout_c("Get tab info conf %d", tableId);
#endif
    c_tables.add(tabPtr);
  }

  if(tabPtr.p->m_attributes.getSize() != 0){
    jam();
    return true;
  }

  /**
   * Initialize table object
   */
  Uint32 noAttribs = tableDesc.NoOfAttributes;
  Uint32 notFixed = (tableDesc.NoOfNullable+tableDesc.NoOfVariable);
  tabPtr.p->m_schemaVersion = tableDesc.TableVersion;
  
  // The attribute buffer
  LocalDataBuffer<15> attrBuf(c_dataBufferPool, tabPtr.p->m_attributes);
  
  // Temporary buffer
  DataBuffer<15> theRest(c_dataBufferPool);

  if(!attrBuf.seize(noAttribs)){
    ndbrequire(false);
    return false;
  }
  
  if(!theRest.seize(notFixed)){
    ndbrequire(false);
    return false;
  }
  
  DataBuffer<15>::DataBufferIterator attrIt; // Fixed not nullable
  DataBuffer<15>::DataBufferIterator restIt; // variable + nullable
  attrBuf.first(attrIt);
  theRest.first(restIt);
  
  for(Uint32 i = 0; i < noAttribs; i++) {
    DictTabInfo::Attribute attrDesc; attrDesc.init();
    s = SimpleProperties::unpack(it, &attrDesc, 
				 DictTabInfo::AttributeMapping, 
				 DictTabInfo::AttributeMappingSize, 
				 true, true);
    ndbrequire(s == SimpleProperties::Break);

    if (!attrDesc.AttributeNullableFlag 
	/* && !attrDesc.AttributeVariableFlag */) {
      jam();
      * attrIt.data = attrDesc.AttributeId;
      attrBuf.next(attrIt);
    } else {
      jam();
      * restIt.data = attrDesc.AttributeId;
      theRest.next(restIt);
    }
    
    // Move to next attribute
    it.next();
  }

  /**
   * Put the rest in end of attrBuf
   */
  theRest.first(restIt);
  for(; !restIt.isNull(); theRest.next(restIt)){
    * attrIt.data = * restIt.data;
    attrBuf.next(attrIt);
  }

  theRest.release();
  
  return true;
}

void
SumaParticipant::execDI_FCOUNTCONF(Signal* signal){
  jamEntry();
  
  CRASH_INSERTION(13007);

  const Uint32 senderData = signal->theData[3];
  SyncRecord* tmp = c_syncPool.getPtr(senderData);
  tmp->runDI_FCOUNTCONF(signal);
}

void 
SumaParticipant::execDIGETPRIMCONF(Signal* signal){
  jamEntry();
  
  CRASH_INSERTION(13008);

  const Uint32 senderData = signal->theData[1];
  SyncRecord* tmp = c_syncPool.getPtr(senderData);
  tmp->runDIGETPRIMCONF(signal);
}

void
SumaParticipant::execCREATE_TRIG_CONF(Signal* signal){
  jamEntry();

  CRASH_INSERTION(13009);

  CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr();

  const Uint32 senderData = conf->getConnectionPtr();
  SyncRecord* tmp = c_syncPool.getPtr(senderData);
  tmp->runCREATE_TRIG_CONF(signal);
  
  /**
   * dodido
   * @todo: I (Johan) dont know what to do here. Jonas, what do you mean?
   */
}

void
SumaParticipant::execCREATE_TRIG_REF(Signal* signal){
  jamEntry();
  ndbrequire(false);
}

void
SumaParticipant::execDROP_TRIG_CONF(Signal* signal){
  jamEntry();

  CRASH_INSERTION(13010);

  DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr();

  const Uint32 senderData = conf->getConnectionPtr();
  SyncRecord* tmp = c_syncPool.getPtr(senderData);
  tmp->runDROP_TRIG_CONF(signal);
}

void
SumaParticipant::execDROP_TRIG_REF(Signal* signal){
  jamEntry();

  DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr();

  const Uint32 senderData = ref->getConnectionPtr();
  SyncRecord* tmp = c_syncPool.getPtr(senderData);
  tmp->runDROP_TRIG_CONF(signal);
}

/*************************************************************************
 *
 *
 */

void
SumaParticipant::SyncRecord::runLIST_TABLES_CONF(Signal* signal){
  jam();

  ListTablesConf * const conf = (ListTablesConf*)signal->getDataPtr();
  const Uint32 len = signal->length() - ListTablesConf::HeaderLength;

  SubscriptionPtr subPtr;
  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);

  for (unsigned i = 0; i < len; i++) {
    subPtr.p->m_maxTables++;
    suma.addTableId(ListTablesConf::getTableId(conf->tableData[i]), subPtr, this);
  }

  //  for (unsigned i = 0; i < len; i++)
  //    conf->tableData[i] = ListTablesConf::getTableId(conf->tableData[i]);
  //  m_tableList.append(&conf->tableData[0], len);

#if 0 
  TableList::DataBufferIterator it;
  int i = 0;
  for(m_tableList.first(it);!it.isNull();m_tableList.next(it)) {
    ndbout_c("%u listtableconf tableid %d", i++, *it.data);
  }
#endif

  if(len == ListTablesConf::DataLength){
    jam();
    // we expect more LIST_TABLE_CONF
    return;
  }

#if 0
  subPtr.p->m_currentTable = 0;
  subPtr.p->m_maxTables    = 0;

  TableList::DataBufferIterator it;
  for(m_tableList.first(it); !it.isNull(); m_tableList.next(it)) {
    subPtr.p->m_maxTables++;
    suma.addTableId(*it.data, subPtr, NULL);
#ifdef NODEFAIL_DEBUG
    ndbout_c(" listtableconf tableid %d",*it.data);
#endif
  }
#endif
  
  startMeta(signal);
}

void
SumaParticipant::SyncRecord::startMeta(Signal* signal){
  jam();
  m_currentTable = 0;
  nextMeta(signal);
}

/**
 * m_tableList only contains UserTables
 */
void
SumaParticipant::SyncRecord::nextMeta(Signal* signal){
  jam();
  
  TableList::DataBufferIterator it;
  if(!m_tableList.position(it, m_currentTable)){
    completeMeta(signal);
    return;
  }

  GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
  req->senderRef = suma.reference();
  req->senderData = ptrI;
  req->requestType = 
    GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
  req->tableId = * it.data;

#if 0
  ndbout_c("GET_TABINFOREQ id %d", req->tableId);
#endif
  suma.sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal, 
		  GetTabInfoReq::SignalLength, JBB);
}

void
SumaParticipant::SyncRecord::runGET_TABINFOREF(Signal* signal)
{
  jam();

  SubscriptionPtr subPtr;
  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  ndbrequire(subPtr.p->m_syncPtrI == ptrI);

  Uint32 type = subPtr.p->m_subscriptionType;

  bool do_continue = false;
  switch (type) {
  case SubCreateReq::TableEvent:
    jam();
    break;
  case SubCreateReq::DatabaseSnapshot:
    jam();
    do_continue = true;
    break;
  case SubCreateReq::SelectiveTableSnapshot:
    jam();
    do_continue = true;
    break;
  case SubCreateReq::SingleTableScan:
    jam();
    break;
  default:
    ndbrequire(false);
    break;
  }

  if (! do_continue) {
    m_error = true;
    completeMeta(signal);
    return;
  }

  m_currentTable++;
  nextMeta(signal);
  return;

  // now we need to clean-up
}


void
SumaParticipant::SyncRecord::runGET_TABINFO_CONF(Signal* signal){
  jam();
  
  GetTabInfoConf * const conf = (GetTabInfoConf*)signal->getDataPtr();
  //  const Uint32 gci = conf->gci;
  const Uint32 tableId = conf->tableId;
  TableList::DataBufferIterator it;
  
  ndbrequire(m_tableList.position(it, m_currentTable));
  ndbrequire(* it.data == tableId);
  
  SubscriptionPtr subPtr;
  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  ndbrequire(subPtr.p->m_syncPtrI == ptrI);
  
  SegmentedSectionPtr ptr;
  signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);

  SubMetaData * data = (SubMetaData*)signal->getDataPtrSend();
  /** 
   * sending lastCompleteGCI. Used by Lars in interval calculations
   * incremenet by one, since last_CompleteGCI is the not the current gci.
   */
  data->gci = suma.c_lastCompleteGCI + 1;
  data->tableId = tableId;
  data->senderData = subPtr.p->m_subscriberData;
#if PRINT_ONLY
  ndbout_c("GSN_SUB_META_DATA Table %d", tableId);
#else

  bool okToSend = m_doSendSyncData;

  /*
   * If it is a selectivetablesnapshot and the table is not part of the 
   * subscription, then do not send anything, just continue.
   * If it is a tablevent, don't send regardless since the APIs are not
   * interested in meta data.
   */
  if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot)
    if(!subPtr.p->m_tables[tableId])
      okToSend = false;

  if(okToSend) {
    if(refToNode(subPtr.p->m_subscriberRef) == 0){
      jam();
      suma.EXECUTE_DIRECT(refToBlock(subPtr.p->m_subscriberRef),
			  GSN_SUB_META_DATA,
			  signal, 
			  SubMetaData::SignalLength); 
      jamEntry();
      suma.releaseSections(signal);
    } else {
      jam();
      suma.sendSignal(subPtr.p->m_subscriberRef, 
		      GSN_SUB_META_DATA,
		      signal, 
		      SubMetaData::SignalLength, JBB);
    }
  }
#endif
  
  TablePtr tabPtr;
  ndbrequire(suma.c_tables.find(tabPtr, tableId));
  
  LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments);
  if(fragBuf.getSize() == 0){
    /**
     * We need to gather fragment info
     */
    jam();
    signal->theData[0] = RNIL;
    signal->theData[1] = tableId;
    signal->theData[2] = ptrI;
    suma.sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 3, JBB);    
    return;
  }
  
  m_currentTable++;
  nextMeta(signal);
}

void 
SumaParticipant::SyncRecord::runDI_FCOUNTCONF(Signal* signal){
  jam();

  const Uint32 userPtr = signal->theData[0];
  const Uint32 fragCount = signal->theData[1];
  const Uint32 tableId = signal->theData[2];

  ndbrequire(userPtr == RNIL && signal->length() == 5);

  TablePtr tabPtr;
  ndbrequire(suma.c_tables.find(tabPtr, tableId));
  
  LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool,  tabPtr.p->m_fragments);  
  ndbrequire(fragBuf.getSize() == 0);
  
  m_currentFragment = fragCount;
  signal->theData[0] = RNIL;
  signal->theData[1] = ptrI;
  signal->theData[2] = tableId;
  signal->theData[3] = 0; // Frag no
  suma.sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);
}

void
SumaParticipant::SyncRecord::runDIGETPRIMCONF(Signal* signal){
  jam();

  const Uint32 userPtr = signal->theData[0];
  //const Uint32 senderData = signal->theData[1];
  const Uint32 nodeCount = signal->theData[6];
  const Uint32 tableId = signal->theData[7];
  const Uint32 fragNo = signal->theData[8];
  
  ndbrequire(userPtr == RNIL && signal->length() == 9);
  ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS);
  
  TablePtr tabPtr;
  ndbrequire(suma.c_tables.find(tabPtr, tableId));
  LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool,  tabPtr.p->m_fragments);  

  /**
   * Add primary node for fragment to list
   */
  FragmentDescriptor fd;
  fd.m_fragDesc.m_nodeId = signal->theData[2];
  fd.m_fragDesc.m_fragmentNo = fragNo;
  signal->theData[2] = fd.m_dummy;
  fragBuf.append(&signal->theData[2], 1);
  
  const Uint32 nextFrag = fragNo + 1;
  if(nextFrag == m_currentFragment){
    /**
     * Complete frag info for table
     */
    m_currentTable++;
    nextMeta(signal);
    return;
  }
  signal->theData[0] = RNIL;
  signal->theData[1] = ptrI;
  signal->theData[2] = tableId;
  signal->theData[3] = nextFrag; // Frag no
  suma.sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);
}

void
SumaParticipant::SyncRecord::completeMeta(Signal* signal){
  jam();
  SubscriptionPtr subPtr;
  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  ndbrequire(subPtr.p->m_syncPtrI == ptrI);
  
#if PRINT_ONLY
  ndbout_c("GSN_SUB_SYNC_CONF (meta)");
#else
 
  suma.releaseSections(signal);

  if (m_error) {
    SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend();
    ref->subscriptionId = subPtr.p->m_subscriptionId;
    ref->subscriptionKey = subPtr.p->m_subscriptionKey;
    ref->part = SubscriptionData::MetaData;
    ref->subscriberData = subPtr.p->m_subscriberData;
    ref->errorCode = SubSyncRef::Undefined;
    suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_REF, signal,
		    SubSyncRef::SignalLength, JBB);
  } else {
    SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();
    conf->subscriptionId = subPtr.p->m_subscriptionId;
    conf->subscriptionKey = subPtr.p->m_subscriptionKey;
    conf->part = SubscriptionData::MetaData;
    conf->subscriberData = subPtr.p->m_subscriberData;
    suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONF, signal,
		    SubSyncConf::SignalLength, JBB);
  }
#endif
}

/**********************************************************
 *
 * Scan interface
 *
 */

void
SumaParticipant::SyncRecord::startScan(Signal* signal){
  jam();
  
  /**
   * Get fraginfo
   */
  m_currentTable = 0;
  m_currentFragment = 0;
  
  nextScan(signal);
}

bool
SumaParticipant::SyncRecord::getNextFragment(TablePtr * tab, 
					     FragmentDescriptor * fd){
  jam();
  SubscriptionPtr subPtr;
  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  TableList::DataBufferIterator tabIt;
  DataBuffer<15>::DataBufferIterator fragIt;
  
  m_tableList.position(tabIt, m_currentTable);
  for(; !tabIt.curr.isNull(); m_tableList.next(tabIt), m_currentTable++){
    TablePtr tabPtr;
    ndbrequire(suma.c_tables.find(tabPtr, * tabIt.data));
    if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot) 
      {
	if(!subPtr.p->m_tables[tabPtr.p->m_tableId]) {
	  *tab = tabPtr;
	  return true;
	}
      }
    LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool,  tabPtr.p->m_fragments);
    
    fragBuf.position(fragIt, m_currentFragment);
    for(; !fragIt.curr.isNull(); fragBuf.next(fragIt), m_currentFragment++){
      FragmentDescriptor tmp;
      tmp.m_dummy = * fragIt.data;
      if(tmp.m_fragDesc.m_nodeId == suma.getOwnNodeId()){
	* fd = tmp;
	* tab = tabPtr;
	return true;
      }
    }
    m_currentFragment = 0;
  }
  return false;
}

void
SumaParticipant::SyncRecord::nextScan(Signal* signal){
  jam();
  TablePtr tabPtr;
  FragmentDescriptor fd;
  SubscriptionPtr subPtr;
  if(!getNextFragment(&tabPtr, &fd)){
    jam();
    completeScan(signal);
    return;
  }
  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  ndbrequire(subPtr.p->m_syncPtrI == ptrI);
 
  if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot) {
    jam();
    if(!subPtr.p->m_tables[tabPtr.p->m_tableId]) {
      /*
       * table is not part of the subscription. Check next table
       */
      m_currentTable++;
      nextScan(signal);
      return;
    }
  }

  DataBuffer<15>::Head head = m_attributeList;
  if(head.getSize() == 0){
    head = tabPtr.p->m_attributes;
  }
  LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, head);
  
  ScanFragReq * req = (ScanFragReq *)signal->getDataPtrSend();
  const Uint32 parallelism = 16;
  const Uint32 attrLen = 5 + attrBuf.getSize();

  req->senderData = m_subscriptionPtrI;
  req->resultRef = suma.reference();
  req->tableId = tabPtr.p->m_tableId;
  req->requestInfo = 0;
  req->savePointId = 0;
  ScanFragReq::setLockMode(req->requestInfo, 0);
  ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
  ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
  ScanFragReq::setAttrLen(req->requestInfo, attrLen);
  req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;
  req->schemaVersion = tabPtr.p->m_schemaVersion;
  req->transId1 = 0;
  req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);
  req->clientOpPtr = (ptrI << 16);
  req->batch_size_rows= 16;
  req->batch_size_bytes= 0;
  suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 
		  ScanFragReq::SignalLength, JBB);
  
  signal->theData[0] = ptrI;
  signal->theData[1] = 0;
  signal->theData[2] = (SUMA << 20) + (suma.getOwnNodeId() << 8);
  
  // Return all
  signal->theData[3] = attrBuf.getSize();
  signal->theData[4] = 0;
  signal->theData[5] = 0;
  signal->theData[6] = 0;
  signal->theData[7] = 0;
  
  Uint32 dataPos = 8;
  DataBuffer<15>::DataBufferIterator it;
  for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){
    AttributeHeader::init(&signal->theData[dataPos++], * it.data, 0);
    if(dataPos == 25){
      suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, 25, JBB);
	dataPos = 3;
    }
  }
  if(dataPos != 3){
    suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, dataPos, JBB);
  }
  
  m_currentTableId = tabPtr.p->m_tableId;
  m_currentNoOfAttributes = attrBuf.getSize();        
}


void
SumaParticipant::execSCAN_FRAGREF(Signal* signal){
  jamEntry();

//  ScanFragRef * const ref = (ScanFragRef*)signal->getDataPtr();
  ndbrequire(false);
}

void
SumaParticipant::execSCAN_FRAGCONF(Signal* signal){
  jamEntry();

  CRASH_INSERTION(13011);

  ScanFragConf * const conf = (ScanFragConf*)signal->getDataPtr();
  
  const Uint32 completed = conf->fragmentCompleted;
  const Uint32 senderData = conf->senderData;
  const Uint32 completedOps = conf->completedOps;

  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr, senderData);
  
  if(completed != 2){
    jam();
    
#if PRINT_ONLY
    SubSyncContinueConf * const conf = 
      (SubSyncContinueConf*)signal->getDataPtrSend();  
    conf->subscriptionId = subPtr.p->m_subscriptionId;
    conf->subscriptionKey = subPtr.p->m_subscriptionKey;
    execSUB_SYNC_CONTINUE_CONF(signal);
#else
    SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtrSend();
    req->subscriberData = subPtr.p->m_subscriberData;
    req->noOfRowsSent = completedOps;
    sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONTINUE_REQ, signal,
	       SubSyncContinueReq::SignalLength, JBB);
#endif
    return;
  }

  ndbrequire(completedOps == 0);
  
  SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI);
  
  tmp->m_currentFragment++;
  tmp->nextScan(signal);
}

void
SumaParticipant::execSUB_SYNC_CONTINUE_CONF(Signal* signal){
  jamEntry();
  
  CRASH_INSERTION(13012);

  SubSyncContinueConf * const conf = 
    (SubSyncContinueConf*)signal->getDataPtr();  
  
  SubscriptionPtr subPtr;
  Subscription key; 
  key.m_subscriptionId = conf->subscriptionId;
  key.m_subscriptionKey = conf->subscriptionKey;
  
  ndbrequire(c_subscriptions.find(subPtr, key));

  ScanFragNextReq * req = (ScanFragNextReq *)signal->getDataPtrSend();
  req->senderData = subPtr.i;
  req->closeFlag = 0;
  req->transId1 = 0;
  req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8);
  req->batch_size_rows = 16;
  req->batch_size_bytes = 0;
  sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal, 
	     ScanFragNextReq::SignalLength, JBB);
}

void
SumaParticipant::SyncRecord::completeScan(Signal* signal){
  jam();
  //  m_tableList.release();

  SubscriptionPtr subPtr;
  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  ndbrequire(subPtr.p->m_syncPtrI == ptrI);
  
#if PRINT_ONLY
  ndbout_c("GSN_SUB_SYNC_CONF (data)");
#else
  SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();
  conf->subscriptionId = subPtr.p->m_subscriptionId;
  conf->subscriptionKey = subPtr.p->m_subscriptionKey;
  conf->part = SubscriptionData::TableData;
  conf->subscriberData = subPtr.p->m_subscriberData;
  suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONF, signal,
		  SubSyncConf::SignalLength, JBB);
#endif
}

void
SumaParticipant::execSCAN_HBREP(Signal* signal){
  jamEntry();
#if 0
  ndbout << "execSCAN_HBREP" << endl << hex;
  for(int i = 0; i<signal->length(); i++){
    ndbout << signal->theData[i] << " ";
    if(((i + 1) % 8) == 0)
      ndbout << endl << hex;
  }
  ndbout << endl;
#endif
}

/**********************************************************
 *
 * Suma participant interface
 *
 * Creation of subscriber
 *
 */

void
SumaParticipant::execSUB_START_REQ(Signal* signal){
  jamEntry();
#ifdef NODEFAIL_DEBUG
  ndbout_c("Suma::execSUB_START_REQ");
#endif

  CRASH_INSERTION(13013);

  if (c_restartLock) {
    jam();
    //    ndbout_c("c_restartLock");
    if (RtoI(signal->getSendersBlockRef(), false) == RNIL) {
      jam();
      sendSubStartRef(signal, /** Error Code */ 0, true);
      return;
    }
    // only allow other Suma's in the nodegroup to come through for restart purposes
  }

  Subscription key; 

  SubStartReq * const req = (SubStartReq*)signal->getDataPtr();

  Uint32 senderRef            = req->senderRef;
  Uint32 senderData           = req->senderData;
  Uint32 subscriberData       = req->subscriberData;
  Uint32 subscriberRef        = req->subscriberRef;
  SubscriptionData::Part part = (SubscriptionData::Part)req->part;
  key.m_subscriptionId        = req->subscriptionId;
  key.m_subscriptionKey       = req->subscriptionKey;

  SubscriptionPtr subPtr;
  if(!c_subscriptions.find(subPtr, key)){
    jam();
    sendSubStartRef(signal, /** Error Code */ 0);
    return;
  }
  
  Ptr<SyncRecord> syncPtr;
  c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
  if (syncPtr.p->m_locked) {
    jam();
#if 0
    ndbout_c("Locked");
#endif
    sendSubStartRef(signal, /** Error Code */ 0, true);
    return;
  }
  syncPtr.p->m_locked = true;

  SubscriberPtr subbPtr;
  if(!c_subscriberPool.seize(subbPtr)){
    jam();
    syncPtr.p->m_locked = false;
    sendSubStartRef(signal, /** Error Code */ 0);
    return;
  }

  Uint32 type = subPtr.p->m_subscriptionType;

  subbPtr.p->m_senderRef  = senderRef;
  subbPtr.p->m_senderData = senderData;

  switch (type) {
  case SubCreateReq::TableEvent:
    jam();
    // we want the data to return to the API not DICT
    subbPtr.p->m_subscriberRef = subscriberRef;
    //    ndbout_c("start ref = %u", signal->getSendersBlockRef());
    //    ndbout_c("ref = %u", subbPtr.p->m_subscriberRef);
    // we use the subscription id for now, should really be API choice
    subbPtr.p->m_subscriberData = subscriberData;

#if 0
    if (RtoI(signal->getSendersBlockRef(), false) == RNIL) {
      jam();
      for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
	Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]);
	if (ref != reference()) {
	  jam();
	  sendSubStartReq(subPtr, subbPtr, signal, ref);
	} else
	  jam();
      }
    }
#endif
    break;
  case SubCreateReq::DatabaseSnapshot:
  case SubCreateReq::SelectiveTableSnapshot:
    jam();
    subbPtr.p->m_subscriberRef = GREP_REF;
    subbPtr.p->m_subscriberData = subPtr.p->m_subscriberData;
    break;
  case SubCreateReq::SingleTableScan:
    jam();
    subbPtr.p->m_subscriberRef = subPtr.p->m_subscriberRef;
    subbPtr.p->m_subscriberData = subPtr.p->m_subscriberData;
  }
  
  subbPtr.p->m_subPtrI = subPtr.i;
  subbPtr.p->m_firstGCI = RNIL;
  if (type == SubCreateReq::TableEvent)
    subbPtr.p->m_lastGCI = 0;
  else
    subbPtr.p->m_lastGCI = RNIL; // disable usage of m_lastGCI
  bool ok = false;
  
  switch(part){
  case SubscriptionData::MetaData:
    ok = true;
    jam();
    c_metaSubscribers.add(subbPtr);
    sendSubStartComplete(signal, subbPtr, 0, part);
    break;
  case SubscriptionData::TableData: 
    ok = true;
    jam();
    c_prepDataSubscribers.add(subbPtr);
    syncPtr.p->startTrigger(signal);
    break;
  }
  ndbrequire(ok);
}

void
SumaParticipant::sendSubStartComplete(Signal* signal,
				      SubscriberPtr subbPtr, 
				      Uint32 firstGCI,
				      SubscriptionData::Part part){
  jam();

  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);

  Ptr<SyncRecord> syncPtr;
  c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
  syncPtr.p->m_locked = false;

  SubStartConf * const conf = (SubStartConf*)signal->getDataPtrSend();    
  
  conf->senderRef       = reference();
  conf->senderData      = subbPtr.p->m_senderData;
  conf->subscriptionId  = subPtr.p->m_subscriptionId;
  conf->subscriptionKey = subPtr.p->m_subscriptionKey;
  conf->firstGCI = firstGCI;
  conf->part = (Uint32) part;
  
  conf->subscriberData = subPtr.p->m_subscriberData;
  sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_START_CONF, signal,
	     SubStartConf::SignalLength, JBB);
}

#if 0
void
SumaParticipant::sendSubStartRef(SubscriptionPtr subPtr,
				 Signal* signal, Uint32 errCode,
				 bool temporary){
  jam();
  SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
  xxx ref->senderRef       = reference();
  xxx ref->senderData      = subPtr.p->m_senderData;
  ref->subscriptionId  = subPtr.p->m_subscriptionId;
  ref->subscriptionKey = subPtr.p->m_subscriptionKey;
  ref->part            = (Uint32) subPtr.p->m_subscriptionType;
  ref->subscriberData  = subPtr.p->m_subscriberData;
  ref->err             = errCode;
  if (temporary) {
    jam();
    ref->setTemporary();
  }
  releaseSections(signal);
  sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_START_REF, signal, 
	     SubStartRef::SignalLength, JBB);
}
#endif
void
SumaParticipant::sendSubStartRef(Signal* signal, Uint32 errCode,
				 bool temporary){
  jam();
  SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
  ref->senderRef  = reference();
  ref->err = errCode;
  if (temporary) {
    jam();
    ref->setTemporary();
  }
  releaseSections(signal);
  sendSignal(signal->getSendersBlockRef(), GSN_SUB_START_REF, signal, 
	     SubStartRef::SignalLength, JBB);
}

/**********************************************************
 *
 * Trigger admin interface
 *
 */

void
SumaParticipant::SyncRecord::startTrigger(Signal* signal){
  jam();
  m_currentTable = 0;
  m_latestTriggerId = RNIL;
  nextTrigger(signal);
}

void
SumaParticipant::SyncRecord::nextTrigger(Signal* signal){
  jam();

  TableList::DataBufferIterator it;
  
  if(!m_tableList.position(it, m_currentTable)){
    completeTrigger(signal);
    return;
  }

  SubscriptionPtr subPtr;
  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  ndbrequire(subPtr.p->m_syncPtrI == ptrI);
  const Uint32 RT_BREAK = 48;
  Uint32 latestTriggerId = 0;
  for(Uint32 i = 0; i<RT_BREAK && !it.isNull(); i++, m_tableList.next(it)){   
    TablePtr tabPtr;
#if 0
    ndbout_c("nextTrigger tableid %u", *it.data);
#endif
    ndbrequire(suma.c_tables.find(tabPtr, *it.data));

    AttributeMask attrMask;
    createAttributeMask(attrMask, tabPtr.p);

    for(Uint32 j = 0; j<3; j++){
      i++;
      latestTriggerId = (tabPtr.p->m_schemaVersion << 18) |
	(j << 16) | tabPtr.p->m_tableId;
      if(tabPtr.p->m_hasTriggerDefined[j] == 0) {
	ndbrequire(tabPtr.p->m_triggerIds[j] == ILLEGAL_TRIGGER_ID);
#if 0
	ndbout_c("DEFINING trigger on table %u[%u]", tabPtr.p->m_tableId, j);
#endif
	CreateTrigReq * const req = (CreateTrigReq*)signal->getDataPtrSend();
	req->setUserRef(SUMA_REF);
	req->setConnectionPtr(ptrI);
	req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
	req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
	req->setMonitorReplicas(true);
	req->setMonitorAllAttributes(false);
	req->setReceiverRef(SUMA_REF);
	req->setTriggerId(latestTriggerId);
	req->setTriggerEvent((TriggerEvent::Value)j);
	req->setTableId(tabPtr.p->m_tableId);
	req->setAttributeMask(attrMask);
	suma.sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ, 
			signal, CreateTrigReq::SignalLength, JBB);

      } else {
	/**
	 * Faking that a trigger has been created in order to
	 * simulate the proper behaviour.
	 * Perhaps this should be a dummy signal instead of 
	 * (ab)using CREATE_TRIG_CONF.
	 */ 
	CreateTrigConf * conf = (CreateTrigConf*)signal->getDataPtrSend();
	conf->setConnectionPtr(ptrI);
	conf->setTableId(tabPtr.p->m_tableId);
	conf->setTriggerId(latestTriggerId);
	suma.sendSignal(SUMA_REF,GSN_CREATE_TRIG_CONF,
			signal, CreateTrigConf::SignalLength, JBB);
	  
      }

    }
    m_currentTable++;
  }
  m_latestTriggerId = latestTriggerId;
}

void
SumaParticipant::SyncRecord::createAttributeMask(AttributeMask& mask, 
						 Table * table){
  jam();
  mask.clear();
  DataBuffer<15>::DataBufferIterator it;
  LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, table->m_attributes);
  for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){
    mask.set(* it.data);
  }
}

void
SumaParticipant::SyncRecord::runCREATE_TRIG_CONF(Signal* signal){
  jam();
  
  CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr();
  const Uint32 triggerId = conf->getTriggerId();
  Uint32 type = (triggerId >> 16) & 0x3;
  Uint32 tableId = conf->getTableId();
  
  TablePtr tabPtr;
  ndbrequire(suma.c_tables.find(tabPtr, tableId));

  ndbrequire(type < 3);
  tabPtr.p->m_triggerIds[type] = triggerId;
  tabPtr.p->m_hasTriggerDefined[type]++;

  if(triggerId == m_latestTriggerId){
    jam();
    nextTrigger(signal);
  }
}

void
SumaParticipant::SyncRecord::completeTrigger(Signal* signal){
  jam();
  SubscriptionPtr subPtr;
  CRASH_INSERTION(13013);
#ifdef EVENT_PH3_DEBUG
  ndbout_c("SumaParticipant: trigger completed");
#endif
  Uint32 gci;
  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  ndbrequire(subPtr.p->m_syncPtrI == ptrI);

  SubscriberPtr subbPtr;
  {
    bool found = false;

    for(suma.c_prepDataSubscribers.first(subbPtr);
	!subbPtr.isNull(); suma.c_prepDataSubscribers.next(subbPtr)) {
      jam();
      if(subbPtr.p->m_subPtrI == subPtr.i) {
	jam();
	found = true;
	break;
      }
    }
    ndbrequire(found);
    gci = suma.getFirstGCI(signal);
    subbPtr.p->m_firstGCI = gci;
    suma.c_prepDataSubscribers.remove(subbPtr);
    suma.c_dataSubscribers.add(subbPtr);
  }
  suma.sendSubStartComplete(signal, subbPtr, gci,  SubscriptionData::TableData);
}

void
SumaParticipant::SyncRecord::startDropTrigger(Signal* signal){
  jam();
  m_currentTable = 0;
  m_latestTriggerId = RNIL;
  nextDropTrigger(signal);
}

void
SumaParticipant::SyncRecord::nextDropTrigger(Signal* signal){
  jam();

  TableList::DataBufferIterator it;
  
  if(!m_tableList.position(it, m_currentTable)){
    completeDropTrigger(signal);
    return;
  }

  SubscriptionPtr subPtr;
  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  ndbrequire(subPtr.p->m_syncPtrI == ptrI);

  const Uint32 RT_BREAK = 48;
  Uint32 latestTriggerId = 0;
  for(Uint32 i = 0; i<RT_BREAK && !it.isNull(); i++, m_tableList.next(it)){
    jam();
    TablePtr tabPtr;
#if 0
    ndbout_c("nextDropTrigger tableid %u", *it.data);
#endif
    ndbrequire(suma.c_tables.find(tabPtr, * it.data));

    for(Uint32 j = 0; j<3; j++){
      jam();
      ndbrequire(tabPtr.p->m_triggerIds[j] != ILLEGAL_TRIGGER_ID);
      i++;
      latestTriggerId = tabPtr.p->m_triggerIds[j];
      if(tabPtr.p->m_hasTriggerDefined[j] == 1) {
	jam();

	DropTrigReq * const req = (DropTrigReq*)signal->getDataPtrSend();
	req->setConnectionPtr(ptrI);
	req->setUserRef(SUMA_REF); // Sending to myself
	req->setRequestType(DropTrigReq::RT_USER);
	req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
	req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
	req->setIndexId(RNIL);

	req->setTableId(tabPtr.p->m_tableId);
	req->setTriggerId(latestTriggerId);
	req->setTriggerEvent((TriggerEvent::Value)j);

#if 0
	ndbout_c("DROPPING trigger %u = %u %u %u on table %u[%u]",
		 latestTriggerId,TriggerType::SUBSCRIPTION_BEFORE,
		 TriggerActionTime::TA_DETACHED, j, tabPtr.p->m_tableId, j);
#endif
	suma.sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ,
			signal, DropTrigReq::SignalLength, JBB);
      } else {
	jam();
	ndbrequire(tabPtr.p->m_hasTriggerDefined[j] > 1);
	/**
	 * Faking that a trigger has been dropped in order to
	 * simulate the proper behaviour.
	 * Perhaps this should be a dummy signal instead of 
	 * (ab)using DROP_TRIG_CONF.
	 */ 
	DropTrigConf * conf = (DropTrigConf*)signal->getDataPtrSend();
	conf->setConnectionPtr(ptrI);
	conf->setTableId(tabPtr.p->m_tableId);
	conf->setTriggerId(latestTriggerId);
	suma.sendSignal(SUMA_REF,GSN_DROP_TRIG_CONF,
			signal, DropTrigConf::SignalLength, JBB);
      }
    }
    m_currentTable++;
  }
  m_latestTriggerId = latestTriggerId;
}

void
SumaParticipant::SyncRecord::runDROP_TRIG_REF(Signal* signal){
  jam();
  DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr();
  if (ref->getErrorCode() != DropTrigRef::TriggerNotFound){
    ndbrequire(false);
  }
  const Uint32 triggerId = ref->getTriggerId();
  Uint32 tableId = ref->getTableId();
  runDropTrig(signal, triggerId, tableId);
}

void
SumaParticipant::SyncRecord::runDROP_TRIG_CONF(Signal* signal){
  jam();
  
  DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr();
  const Uint32 triggerId = conf->getTriggerId();
  Uint32 tableId = conf->getTableId();
  runDropTrig(signal, triggerId, tableId);
}

void
SumaParticipant::SyncRecord::runDropTrig(Signal* signal,
					 Uint32 triggerId,
					 Uint32 tableId){
  Uint32 type = (triggerId >> 16) & 0x3;
  
  TablePtr tabPtr;
  ndbrequire(suma.c_tables.find(tabPtr, tableId));

  ndbrequire(type < 3);
  ndbrequire(tabPtr.p->m_triggerIds[type] == triggerId);
  tabPtr.p->m_hasTriggerDefined[type]--;
  if (tabPtr.p->m_hasTriggerDefined[type] == 0) {
    jam();
    tabPtr.p->m_triggerIds[type] = ILLEGAL_TRIGGER_ID;
  }
  if(triggerId == m_latestTriggerId){
    jam();
    nextDropTrigger(signal);
  }
}

void
SumaParticipant::SyncRecord::completeDropTrigger(Signal* signal){
  jam();
  SubscriptionPtr subPtr;
  CRASH_INSERTION(13014);
#if 0
  ndbout_c("trigger completed");
#endif

  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
  ndbrequire(subPtr.p->m_syncPtrI == ptrI);

  bool found = false;
  SubscriberPtr subbPtr;
  for(suma.c_prepDataSubscribers.first(subbPtr);
      !subbPtr.isNull(); suma.c_prepDataSubscribers.next(subbPtr)) {
    jam();
    if(subbPtr.p->m_subPtrI == subPtr.i) {
      jam();
      found = true;
      break;
    }
  }
  ndbrequire(found);
  suma.sendSubStopComplete(signal, subbPtr);
}

/**********************************************************
 * Scan data interface
 *
 * Assumption: one execTRANSID_AI contains all attr info
 *
 */

#define SUMA_BUF_SZ1 MAX_KEY_SIZE_IN_WORDS + MAX_TUPLE_SIZE_IN_WORDS
#define SUMA_BUF_SZ MAX_ATTRIBUTES_IN_TABLE + SUMA_BUF_SZ1

static Uint32 f_bufferLock = 0;
static Uint32 f_buffer[SUMA_BUF_SZ];
static Uint32 f_trigBufferSize = 0;
static Uint32 b_bufferLock = 0;
static Uint32 b_buffer[SUMA_BUF_SZ];
static Uint32 b_trigBufferSize = 0;

void
SumaParticipant::execTRANSID_AI(Signal* signal){
  jamEntry();

  CRASH_INSERTION(13015);
  TransIdAI * const data = (TransIdAI*)signal->getDataPtr();
  const Uint32 opPtrI = data->connectPtr;
  const Uint32 length = signal->length() - 3;

  if(f_bufferLock == 0){
    f_bufferLock = opPtrI;
  } else {
    ndbrequire(f_bufferLock == opPtrI);
  }
  
  Ptr<SyncRecord> syncPtr;
  c_syncPool.getPtr(syncPtr, (opPtrI >> 16));
  
  Uint32 sum = 0;
  Uint32 * dst = f_buffer + MAX_ATTRIBUTES_IN_TABLE;
  Uint32 * headers = f_buffer;
  const Uint32 * src = &data->attrData[0];
  const Uint32 * const end = &src[length];
  
  const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes;
  for(Uint32 i = 0; i<attribs; i++){
    Uint32 tmp = * src++;
    * headers++ = tmp;
    Uint32 len = AttributeHeader::getDataSize(tmp);
    
    memcpy(dst, src, 4 * len);
    dst += len;
    src += len;
    sum += len;
  }
  
  ndbrequire(src == end);

  /**
   * Send data to subscriber
   */
  LinearSectionPtr ptr[3];
  ptr[0].p = f_buffer;
  ptr[0].sz = attribs;
  
  ptr[1].p = f_buffer + MAX_ATTRIBUTES_IN_TABLE;
  ptr[1].sz = sum;

  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr, syncPtr.p->m_subscriptionPtrI);
  
  /**
   * Initialize signal
   */  
  SubTableData * sdata = (SubTableData*)signal->getDataPtrSend();
  Uint32 ref = subPtr.p->m_subscriberRef;
  sdata->tableId = syncPtr.p->m_currentTableId;
  sdata->senderData = subPtr.p->m_subscriberData;
  sdata->operation = 3; // Scan
  sdata->gci = 1; // Undefined
#if PRINT_ONLY
  ndbout_c("GSN_SUB_TABLE_DATA (scan) #attr: %d len: %d", attribs, sum);
#else
  sendSignal(ref,
	     GSN_SUB_TABLE_DATA,
	     signal, 
	     SubTableData::SignalLength, JBB,
	     ptr, 2);
#endif
  
  /**
   * Reset f_bufferLock
   */
  f_bufferLock = 0;
}

/**********************************************************
 *
 * Trigger data interface
 *
 */

void
SumaParticipant::execTRIG_ATTRINFO(Signal* signal){
  jamEntry();
  
  CRASH_INSERTION(13016);
  TrigAttrInfo* const trg = (TrigAttrInfo*)signal->getDataPtr();
  const Uint32 trigId = trg->getTriggerId();

  const Uint32 dataLen = signal->length() - TrigAttrInfo::StaticLength;

  if(trg->getAttrInfoType() == TrigAttrInfo::BEFORE_VALUES){
    jam();

    ndbrequire(b_bufferLock == trigId);

    memcpy(b_buffer + b_trigBufferSize, trg->getData(), 4 * dataLen);
    b_trigBufferSize += dataLen;
    // printf("before values %u %u %u\n",trigId, dataLen,  b_trigBufferSize);
  } else {
    jam();

    if(f_bufferLock == 0){
      f_bufferLock = trigId;
      f_trigBufferSize = 0;
      b_bufferLock = trigId;
      b_trigBufferSize = 0;
    } else {
      ndbrequire(f_bufferLock == trigId);
    }

    memcpy(f_buffer + f_trigBufferSize, trg->getData(), 4 * dataLen);
    f_trigBufferSize += dataLen;
  }
}

#ifdef NODEFAIL_DEBUG2
static int theCounts[64] = {0};
#endif

Uint32 
Suma::getStoreBucket(Uint32 v)
{
  // id will contain id to responsible suma or 
  // RNIL if we don't have nodegroup info yet

  const Uint32 N = NO_OF_BUCKETS;
  const Uint32 D = v % N;            // Distibution key
  return D;
}

Uint32 
Suma::getResponsibleSumaNodeId(Uint32 D)
{
  // id will contain id to responsible suma or 
  // RNIL if we don't have nodegroup info yet

  Uint32 id;

  if (c_restartLock) {
    jam();
    //    ndbout_c("c_restartLock");
    id = RNIL;
  } else {
    jam();
    id = RNIL;
    const Uint32 n = c_noNodesInGroup; // Number nodes in node group
    const Uint32 C1 = D / n;
    const Uint32 C2 = D - C1*n; // = D % n;
    const Uint32 C = C2 + C1 % n;
    for (Uint32 i = 0; i < n; i++) {
      jam();
      id = c_nodesInGroup[(C + i) % n];
      if (c_aliveNodes.get(id) &&
	  !c_preparingNodes.get(id)) {
        jam();
	break;
      }//if
    }
#ifdef NODEFAIL_DEBUG2
    theCounts[id]++;
    ndbout_c("Suma:responsible n=%u, D=%u, id = %u, count=%u",
	     n,D, id, theCounts[id]);
#endif
  }
  return id;
}

Uint32
SumaParticipant::decideWhoToSend(Uint32 nBucket, Uint32 gci){
  bool replicaFlag = true;
  Uint32 nId = RNIL;

  // bucket active/not active set by GCP_COMPLETE
  if (c_buckets[nBucket].active) {
    if (c_buckets[nBucket].handover && c_buckets[nBucket].handoverGCI <= gci) {
      jam();
      replicaFlag = true; // let the other node send this
      nId = RNIL;
      // mark this as started, if we get a node failiure now we have some lost stuff
      c_buckets[nBucket].handover_started = true;
    } else {
      jam();
      replicaFlag = false;
      nId = refToNode(reference());
    }
  } else {
    nId  = getResponsibleSumaNodeId(nBucket);
    replicaFlag = !(nId == refToNode(reference()));
    
    if (!replicaFlag) {
      if (!c_buckets[nBucket].handover) {
	jam();
	// appearently a node has failed and we are taking over sending
	// from that bucket.  Now we need to go back to latest completed
	// GCI.  Handling will depend on Subscriber and Subscription
	
	// TODO, for now we make an easy takeover
	if (gci < c_nodeFailGCI)
	  c_lastInconsistentGCI = gci;
	
	// we now have responsability for this bucket and we're actively
	// sending from that
	c_buckets[nBucket].active = true;
#ifdef HANDOVER_DEBUG
	ndbout_c("Takeover Bucket %u", nBucket);
#endif
      } else if (c_buckets[nBucket].handoverGCI > gci) {
	jam();
	replicaFlag = true; // handover going on, but don't start sending yet
	nId = RNIL;
      } else {
	jam();
#ifdef HANDOVER_DEBUG
	ndbout_c("Possible error: Will send from GCI = %u", gci);
#endif
	}
    }
  }
  
#ifdef NODEFAIL_DEBUG2
  ndbout_c("Suma:bucket %u, responsible id = %u, replicaFlag = %u",
	   nBucket, nId, (Uint32)replicaFlag);
#endif
  return replicaFlag;
}

void
SumaParticipant::execFIRE_TRIG_ORD(Signal* signal){
  jamEntry();

  CRASH_INSERTION(13016);
  FireTrigOrd* const trg = (FireTrigOrd*)signal->getDataPtr();
  const Uint32 trigId    = trg->getTriggerId();
  const Uint32 hashValue = trg->getHashValue();
  const Uint32 gci       = trg->getGCI();
  const Uint32 event     = trg->getTriggerEvent();
  const Uint32 triggerId = trg->getTriggerId();
  Uint32 tableId         = triggerId & 0xFFFF;

  ndbrequire(f_bufferLock == trigId);
  
#ifdef EVENT_DEBUG2
  ndbout_c("SumaParticipant::execFIRE_TRIG_ORD");
#endif

  Uint32 sz = trg->getNoOfPrimaryKeyWords()+trg->getNoOfAfterValueWords();
  ndbrequire(sz == f_trigBufferSize);

  /**
   * Reformat as "all headers" + "all data"
   */
  Uint32 dataLen   = 0;
  Uint32 noOfAttrs = 0;
  Uint32 * src     = f_buffer;
  Uint32 * headers = signal->theData + 25;
  Uint32 * dst     = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE;

  LinearSectionPtr ptr[3];
  int nptr;

  ptr[0].p  = headers;
  ptr[1].p  = dst;

  while(sz > 0){
    jam();
    Uint32 tmp = * src ++;
    * headers ++ = tmp;
    Uint32 len = AttributeHeader::getDataSize(tmp);
    memcpy(dst, src, 4 * len);
    dst += len;
    src += len;
    
    noOfAttrs++;
    dataLen += len;
    sz -= (1 + len);
  }
  ndbrequire(sz == 0);

  ptr[0].sz = noOfAttrs;
  ptr[1].sz = dataLen;

  if (b_trigBufferSize > 0) {
    jam();
    ptr[2].p  = b_buffer;
    ptr[2].sz = b_trigBufferSize;
    nptr = 3;
  } else {
    jam();
    nptr = 2;
  }

  // right now only for tableEvent
  bool replicaFlag = decideWhoToSend(getStoreBucket(hashValue), gci);

  /**
   * Signal to subscriber(s)
   */
  SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
  data->gci            = gci;
  data->tableId        = tableId;
  data->operation      = event;
  data->noOfAttributes = noOfAttrs;
  data->dataSize       =  dataLen;

  SubscriberPtr subbPtr;
  for(c_dataSubscribers.first(subbPtr); !subbPtr.isNull();
      c_dataSubscribers.next(subbPtr)){
    if (subbPtr.p->m_firstGCI > gci) {
#ifdef EVENT_DEBUG
      ndbout_c("m_firstGCI = %u, gci = %u", subbPtr.p->m_firstGCI, gci);
#endif
      jam();
      // we're either restarting or it's a newly created subscriber
      // and waiting for the right gci
      continue;
    }

    jam();

    const Uint32 ref = subbPtr.p->m_subscriberRef;
    //    ndbout_c("ref = %u", ref);
    const Uint32 subdata = subbPtr.p->m_subscriberData;
    data->senderData = subdata;
    /*
     * get subscription ptr for this subscriber
     */
    SubscriptionPtr subPtr;
    c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);

    if(!subPtr.p->m_tables[tableId]) {
      jam();
      continue;
      //continue in for-loop if the table is not part of 
      //the subscription. Otherwise, send data to subscriber.
    }
   
    if (subPtr.p->m_subscriptionType == SubCreateReq::TableEvent) {
      if (replicaFlag) {
	jam();
	c_failoverBuffer.subTableData(gci,NULL,0);
	continue;
      }
      jam();
      Uint32 tmp = data->logType;
      if (c_lastInconsistentGCI == data->gci) {
	data->setGCINotConsistent();
      }

#ifdef HANDOVER_DEBUG
      {
	static int aLongGCIName = 0;
	if (data->gci != aLongGCIName) {
	  aLongGCIName = data->gci;
	  ndbout_c("sent from GCI = %u", aLongGCIName);
	}
      }
#endif
      sendSignal(ref, GSN_SUB_TABLE_DATA, signal,
                 SubTableData::SignalLength, JBB, ptr, nptr);
      data->logType = tmp;
    } else {
      ndbassert(refToNode(ref) == 0 || refToNode(ref) == getOwnNodeId());
      jam();
#if PRINT_ONLY
      ndbout_c("GSN_SUB_TABLE_DATA to %s: op: %d #attr: %d len: %d",
	       getBlockName(refToBlock(ref)), 
	       noOfAttrs, dataLen);
    
#else
#ifdef HANDOVER_DEBUG
      {
	static int aLongGCIName2 = 0;
	if (data->gci != aLongGCIName2) {
	  aLongGCIName2 = data->gci;
	  ndbout_c("(EXECUTE_DIRECT) sent from GCI = %u to %u", aLongGCIName2, ref);
	}
      }
#endif
      EXECUTE_DIRECT(refToBlock(ref), GSN_SUB_TABLE_DATA, signal,
		     SubTableData::SignalLength);  
      jamEntry();
#endif    
    }
  }
  
  /**
   * Reset f_bufferLock
   */
  f_bufferLock = 0;
  b_bufferLock = 0;
}

void
SumaParticipant::execSUB_GCP_COMPLETE_REP(Signal* signal){
  jamEntry();

  SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend();

  Uint32 gci = rep->gci;
  c_lastCompleteGCI = gci;

  /**
   * always send SUB_GCP_COMPLETE_REP to Grep (so 
   * Lars can do funky stuff calculating intervals,
   * even before the subscription is started
   */
  rep->senderRef  = reference();
  rep->senderData = 0; //ignored in grep
  EXECUTE_DIRECT(refToBlock(GREP_REF), GSN_SUB_GCP_COMPLETE_REP, signal,
		 SubGcpCompleteRep::SignalLength);  

  /**
   * Signal to subscriber(s)
   */

  SubscriberPtr subbPtr;
  SubscriptionPtr subPtr;
  c_dataSubscribers.first(subbPtr);
  for(; !subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){

    if (subbPtr.p->m_firstGCI > gci) {
      jam();
      // we don't send SUB_GCP_COMPLETE_REP for incomplete GCI's
      continue;
    }

    const Uint32 ref = subbPtr.p->m_subscriberRef;
    rep->senderRef  = ref;
    rep->senderData = subbPtr.p->m_subscriberData;

    c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
#if PRINT_ONLY
    ndbout_c("GSN_SUB_GCP_COMPLETE_REP to %s:",
	     getBlockName(refToBlock(ref)));
#else
    /**
     * Ignore sending to GREP (since we sent earlier)
     */
    if (ref == GREP_REF) {
      jam();
      continue;
    }

    CRASH_INSERTION(13018);

    if (subPtr.p->m_subscriptionType == SubCreateReq::TableEvent)
      {
	jam();
	sendSignal(ref, GSN_SUB_GCP_COMPLETE_REP, signal,
		   SubGcpCompleteRep::SignalLength, JBB);
      }
    else
      {
	jam();
	ndbassert(refToNode(ref) == 0 || refToNode(ref) == getOwnNodeId());
	EXECUTE_DIRECT(refToBlock(ref), GSN_SUB_GCP_COMPLETE_REP, signal,
		       SubGcpCompleteRep::SignalLength);  
	jamEntry();
      }
#endif    
  }

  if (c_handoverToDo) {
    jam();
    c_handoverToDo = false;
    for( int i = 0; i < NO_OF_BUCKETS; i++) {
      if (c_buckets[i].handover) {
	if (c_buckets[i].handoverGCI > gci) {
	  jam();
	  c_handoverToDo = true; // still waiting for the right GCI
	  break; /* since all handover should happen at the same time
		  * we can break here
		  */
	} else {
	  c_buckets[i].handover = false;
#ifdef HANDOVER_DEBUG
	  ndbout_c("Handover Bucket %u", i);
#endif
	  if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
	    // my bucket to be handed over to me
	    ndbrequire(!c_buckets[i].active);
	    jam();
	    c_buckets[i].active = true;
	  } else {
	    // someone else's bucket to handover to
	    ndbrequire(c_buckets[i].active);
	    jam();
	    c_buckets[i].active = false;
	  }
	}
      }
    }
  }
}

/***********************************************************
 *
 * Embryo to syncronize the Suma's so as to know if a subscriber
 * has received a GCP_COMPLETE from all suma's or not
 *
 */

void
SumaParticipant::runSUB_GCP_COMPLETE_ACC(Signal* signal){
  jam();

  SubGcpCompleteAcc * const acc = (SubGcpCompleteAcc*)signal->getDataPtr();

  Uint32 gci = acc->rep.gci;

#ifdef EVENT_DEBUG
  ndbout_c("SumaParticipant::runSUB_GCP_COMPLETE_ACC gci = %u", gci);
#endif

  c_failoverBuffer.subGcpCompleteRep(gci);
}

void
Suma::execSUB_GCP_COMPLETE_ACC(Signal* signal){
  jamEntry();

  if (RtoI(signal->getSendersBlockRef(), false) != RNIL) {
    jam();
    // Ack from other SUMA
    runSUB_GCP_COMPLETE_ACC(signal);
    return;
  }

  jam();
  // Ack from User and not an acc from other SUMA, redistribute in nodegroup

  SubGcpCompleteAcc * const acc = (SubGcpCompleteAcc*)signal->getDataPtr();
  Uint32 gci = acc->rep.gci;
  Uint32 senderRef  = acc->rep.senderRef;
  Uint32 subscriberData = acc->rep.subscriberData;
  
#ifdef EVENT_DEBUG
  ndbout_c("Suma::execSUB_GCP_COMPLETE_ACC gci = %u", gci);
#endif
  bool moreToCome = false;

  SubscriberPtr subbPtr;
  for(c_dataSubscribers.first(subbPtr);
      !subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){
#ifdef EVENT_DEBUG
    ndbout_c("Suma::execSUB_GCP_COMPLETE_ACC %u == %u && %u == %u",
	     subbPtr.p->m_subscriberRef,
	     senderRef,
	     subbPtr.p->m_subscriberData,
	     subscriberData);
#endif
    if (subbPtr.p->m_subscriberRef == senderRef &&
	subbPtr.p->m_subscriberData == subscriberData) {
      jam();
#ifdef EVENT_DEBUG
      ndbout_c("Suma::execSUB_GCP_COMPLETE_ACC gci = FOUND SUBSCRIBER");
#endif
      subbPtr.p->m_lastGCI = gci;
    } else if (subbPtr.p->m_lastGCI < gci) {
      jam();
      if (subbPtr.p->m_firstGCI <= gci)
	moreToCome = true;
    } else
      jam();
  }
  
  if (!moreToCome) {
    // tell the other SUMA's that I'm done with this GCI
    jam();
    for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
      Uint32 id = c_nodesInGroup[i];
      Uint32 ref = calcSumaBlockRef(id);
      if ((ref != reference()) && c_aliveNodes.get(id)) {
	jam();
	sendSignal(ref, GSN_SUB_GCP_COMPLETE_ACC, signal,
		   SubGcpCompleteAcc::SignalLength, JBB);
      } else
	jam();
    }
  }
}

static Uint32 tmpFailoverBuffer[512];
//SumaParticipant::FailoverBuffer::FailoverBuffer(DataBuffer<15>::DataBufferPool & p)
//  :  m_dataList(p), 
SumaParticipant::FailoverBuffer::FailoverBuffer()
  :
     c_gcis(tmpFailoverBuffer), c_sz(512), c_first(0), c_next(0), c_full(false)
{
}

bool SumaParticipant::FailoverBuffer::subTableData(Uint32 gci, Uint32 *src, int sz)
{
  bool ok = true;

  if (c_full) {
    ok = false;
#ifdef EVENT_DEBUG
    ndbout_c("Suma::FailoverBuffer::SubTableData buffer full gci=%u");
#endif
  } else {
    c_gcis[c_next] = gci;
    c_next++;
    if (c_next == c_sz) c_next = 0;
    if (c_next == c_first)
      c_full = true;
    //    ndbout_c("%u %u %u",c_first,c_next,c_sz);
  }
  return ok;
}
bool SumaParticipant::FailoverBuffer::subGcpCompleteRep(Uint32 gci)
{
  bool ok = true;

  //  ndbout_c("Empty");
  while (true) {
    if (c_first == c_next && !c_full)
      break;
    if (c_gcis[c_first] > gci)
      break;
    c_full = false;
    c_first++;
    if (c_first == c_sz) c_first = 0;
    //    ndbout_c("%u %u %u : ",c_first,c_next,c_sz);
  }

  return ok;
}
bool SumaParticipant::FailoverBuffer::nodeFailRep()
{
  bool ok = true;
  while (true) {
    if (c_first == c_next && !c_full)
      break;

#ifdef EVENT_DEBUG
    ndbout_c("Suma::FailoverBuffer::NodeFailRep resending gci=%u", c_gcis[c_first]);
#endif
    c_full = false;
    c_first++;
    if (c_first == c_sz) c_first = 0;
  }
  return ok;
}

/**********************************************************
 * Suma participant interface
 *
 * Stopping and removing of subscriber
 *
 */

void
SumaParticipant::execSUB_STOP_REQ(Signal* signal){
  jamEntry();
  
  CRASH_INSERTION(13019);

  SubStopReq * const req = (SubStopReq*)signal->getDataPtr();
  Uint32 senderRef      = signal->getSendersBlockRef();
  Uint32 senderData     = req->senderData;
  Uint32 subscriberRef  = req->subscriberRef;
  Uint32 subscriberData = req->subscriberData;
  SubscriptionPtr subPtr;
  Subscription key; 
  key.m_subscriptionId  = req->subscriptionId;
  key.m_subscriptionKey = req->subscriptionKey;
  Uint32 part = req->part;
  
  if (key.m_subscriptionKey == 0 &&
      key.m_subscriptionId == 0 &&
      subscriberData == 0) {
    SubStopConf* conf = (SubStopConf*)signal->getDataPtrSend();
    
    conf->senderRef       = reference();
    conf->senderData      = senderData;
    conf->subscriptionId  = key.m_subscriptionId;
    conf->subscriptionKey = key.m_subscriptionKey;
    conf->subscriberData  = subscriberData;

    sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
	       SubStopConf::SignalLength, JBB);

    removeSubscribersOnNode(signal, refToNode(subscriberRef));
    return;
  }

  if(!c_subscriptions.find(subPtr, key)){
    jam();
    sendSubStopRef(signal, GrepError::SUBSCRIPTION_ID_NOT_FOUND);
    return;
  }
  
  ndbrequire(part == SubscriptionData::TableData);

  SubscriberPtr subbPtr;
  if (senderRef == reference()){
    jam();
    c_subscriberPool.getPtr(subbPtr, senderData);
    ndbrequire(subbPtr.p->m_subPtrI == subPtr.i && 
	       subbPtr.p->m_subscriberRef == subscriberRef &&
	       subbPtr.p->m_subscriberData == subscriberData);
    c_removeDataSubscribers.remove(subbPtr);
  } else {
    bool found = false;
    jam();
    c_dataSubscribers.first(subbPtr);
    for (;!subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){
      jam();
      if (subbPtr.p->m_subPtrI == subPtr.i && 
	  subbPtr.p->m_subscriberRef == subscriberRef &&
	  subbPtr.p->m_subscriberData == subscriberData){
	//	ndbout_c("STOP_REQ: before c_dataSubscribers.release");
	jam();
	c_dataSubscribers.remove(subbPtr);
	found = true;
	break;
      }
    }
    /**
     * If we didn't find anyone, send ref
     */
    if (!found) {
      jam();
      sendSubStopRef(signal, GrepError::SUBSCRIBER_NOT_FOUND);
      return;
    }
  }

  subbPtr.p->m_senderRef  = senderRef; // store ref to requestor
  subbPtr.p->m_senderData = senderData; // store ref to requestor
  c_prepDataSubscribers.add(subbPtr);

  Ptr<SyncRecord> syncPtr;
  c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
  if (syncPtr.p->m_locked) {
    jam();
    sendSubStopRef(signal, /** Error Code */ 0, true);
    return;
  }
  syncPtr.p->m_locked = true;

  syncPtr.p->startDropTrigger(signal);
}

void
SumaParticipant::sendSubStopComplete(Signal* signal, SubscriberPtr subbPtr){
  jam();

  CRASH_INSERTION(13020);

  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);

  Ptr<SyncRecord> syncPtr;
  c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
  syncPtr.p->m_locked = false;

  SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend();
  
  conf->senderRef = reference();
  conf->senderData = subbPtr.p->m_senderData;
  conf->subscriptionId  = subPtr.p->m_subscriptionId;
  conf->subscriptionKey = subPtr.p->m_subscriptionKey;
  conf->subscriberData  = subbPtr.p->m_subscriberData;
  Uint32 senderRef = subbPtr.p->m_senderRef;

  c_prepDataSubscribers.release(subbPtr);
  sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
	     SubStopConf::SignalLength, JBB);
}

void
SumaParticipant::sendSubStopRef(Signal* signal, Uint32 errCode,
				bool temporary){
  jam();
  SubStopRef  * ref = (SubStopRef *)signal->getDataPtrSend();
  ref->senderRef = reference();
  ref->errorCode = errCode;
  if (temporary) {
    ref->setTemporary();
  }
  sendSignal(signal->getSendersBlockRef(), 
	     GSN_SUB_STOP_REF, 
	     signal, 
	     SubStopRef::SignalLength,
	     JBB);
  return;
}

/**************************************************************
 *
 * Removing subscription
 *
 */

void
SumaParticipant::execSUB_REMOVE_REQ(Signal* signal) {
  jamEntry();

  Uint32 senderRef = signal->getSendersBlockRef();

  CRASH_INSERTION(13021);

  const SubRemoveReq req = *(SubRemoveReq*)signal->getDataPtr();
  SubscriptionPtr subPtr;
  Subscription key;
  key.m_subscriptionId  = req.subscriptionId;
  key.m_subscriptionKey = req.subscriptionKey;
  
  if(!c_subscriptions.find(subPtr, key)) {
    jam();
    sendSubRemoveRef(signal, req, (Uint32) GrepError::SUBSCRIPTION_ID_NOT_FOUND);
    return;
  }
  
  int count = 0;
  {
    jam();
    SubscriberPtr i_subbPtr;
    for(c_prepDataSubscribers.first(i_subbPtr);
	!i_subbPtr.isNull(); c_prepDataSubscribers.next(i_subbPtr)){
      jam();
      if( i_subbPtr.p->m_subPtrI == subPtr.i ) {
	jam();
	sendSubRemoveRef(signal, req, /* ErrorCode */ 0, true);
	return;
	//	c_prepDataSubscribers.release(subbPtr);
      }
    }
    c_dataSubscribers.first(i_subbPtr);
    while(!i_subbPtr.isNull()){
      jam();
      SubscriberPtr subbPtr = i_subbPtr;
      c_dataSubscribers.next(i_subbPtr);
      if( subbPtr.p->m_subPtrI == subPtr.i ) {
	jam();
	sendSubRemoveRef(signal, req, /* ErrorCode */ 0, true);
	return;
	/* Unfinished/untested code.  If remove should be possible
	 * even if subscribers are left these have to be stopped 
	 * first. See m_markRemove, m_nSubscribers. We need also to
	 * block remove for this subscription so that multiple
	 * removes is not possible...
	 */
	c_dataSubscribers.remove(subbPtr);
	c_removeDataSubscribers.add(subbPtr);
	count++;
      }
    }
    c_metaSubscribers.first(i_subbPtr);
    while(!i_subbPtr.isNull()){
      jam();
      SubscriberPtr subbPtr = i_subbPtr;
      c_metaSubscribers.next(i_subbPtr);
      if( subbPtr.p->m_subPtrI == subPtr.i ){
	jam();
	c_metaSubscribers.release(subbPtr);
      }
    }
  }

  subPtr.p->m_senderRef  = senderRef;
  subPtr.p->m_senderData = req.senderData;

  if (count > 0){
    jam();
    ndbrequire(false); // code not finalized
    subPtr.p->m_markRemove = true;
    subPtr.p->m_nSubscribers = count;
    sendSubStopReq(signal);
  } else {
    completeSubRemoveReq(signal, subPtr);
  }
}

void
SumaParticipant::completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr) {
  Uint32 subscriptionId  = subPtr.p->m_subscriptionId;
  Uint32 subscriptionKey = subPtr.p->m_subscriptionKey;
  Uint32 senderRef       = subPtr.p->m_senderRef;
  Uint32 senderData      = subPtr.p->m_senderData;

  {
    Ptr<SyncRecord> syncPtr;
    c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
	
    syncPtr.p->release();
    c_syncPool.release(syncPtr);
  }

  //  if (subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) {
  //    jam();
  //    senderRef = subPtr.p->m_subscriberRef;
  //  }
  c_subscriptions.release(subPtr);

  /**
   * I was the last subscription to be remove so clear c_tables
   */
#if 0
  ndbout_c("c_subscriptionPool.getSize() %d c_subscriptionPool.getNoOfFree()%d",
	   c_subscriptionPool.getSize(),c_subscriptionPool.getNoOfFree()+1);
#endif

  if(c_subscriptionPool.getSize() == c_subscriptionPool.getNoOfFree()+1) {
    jam();
#if 0
    ndbout_c("SUB_REMOVE_REQ:Clearing c_tables");
#endif
    KeyTable<Table>::Iterator it;
    for(c_tables.first(it); !it.isNull(); ){
      
      it.curr.p->release(* this);
      
      TablePtr tabPtr = it.curr;
      
      c_tables.next(it);
      c_tables.release(tabPtr);
    }
  }
  
  SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
  conf->senderRef            = reference();
  conf->senderData           = senderData;
  conf->subscriptionId       = subscriptionId;
  conf->subscriptionKey      = subscriptionKey;

  sendSignal(senderRef, GSN_SUB_REMOVE_CONF, signal,
	     SubRemoveConf::SignalLength, JBB);
}

void
SumaParticipant::sendSubRemoveRef(Signal* signal, const SubRemoveReq& req,
				  Uint32 errCode, bool temporary){
  jam();
  SubRemoveRef  * ref = (SubRemoveRef *)signal->getDataPtrSend();
  ref->senderRef  = reference();
  ref->senderData = req.senderData;
  ref->err = errCode;
  if (temporary)
    ref->setTemporary();
  releaseSections(signal);
  sendSignal(signal->getSendersBlockRef(), GSN_SUB_REMOVE_REF, 
	     signal, SubRemoveRef::SignalLength, JBB);
  return;
}

void
SumaParticipant::Table::release(SumaParticipant & suma){
  jam();

  LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributes);
  attrBuf.release();

  LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments);
  fragBuf.release();
}

void
SumaParticipant::SyncRecord::release(){
  jam();
  m_tableList.release();

  LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributeList);
  attrBuf.release();  
}


/**************************************************************
 *
 * Restarting remote node functions, master functionality
 * (slave does nothing special)
 * - triggered on INCL_NODEREQ calling startNode
 * - included node will issue START_ME when it's ready to start
 * the subscribers
 *
 */

Suma::Restart::Restart(Suma& s) : suma(s) {
  for (int i = 0; i < MAX_REPLICAS; i++) {
    c_okToStart[i]      = false;
    c_waitingToStart[i] = false;
  }
};

void
Suma::Restart::resetNode(Uint32 sumaRef)
{
  jam();
  int I = suma.RtoI(sumaRef);
  c_okToStart[I] = false;
  c_waitingToStart[I] = false;
}

void
Suma::Restart::startNode(Signal* signal, Uint32 sumaRef)
{
  jam();
  resetNode(sumaRef);

  // right now we can only handle restarting one node
  // at a time in a node group

  createSubscription(signal, sumaRef);
}

void 
Suma::Restart::createSubscription(Signal* signal, Uint32 sumaRef) {
  jam();
  suma.c_subscriptions.first(c_subPtr);
  nextSubscription(signal, sumaRef);
}

void 
Suma::Restart::nextSubscription(Signal* signal, Uint32 sumaRef) {
  jam();
  if (c_subPtr.isNull()) {
    jam();
    completeSubscription(signal, sumaRef);
    return;
  }
  SubscriptionPtr subPtr;
  subPtr.i = c_subPtr.curr.i;
  subPtr.p = suma.c_subscriptions.getPtr(subPtr.i);

  suma.c_subscriptions.next(c_subPtr);

  SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
      
  req->subscriberRef    = suma.reference();
  req->subscriberData   = subPtr.i;
  req->subscriptionId   = subPtr.p->m_subscriptionId;
  req->subscriptionKey  = subPtr.p->m_subscriptionKey;
  req->subscriptionType = subPtr.p->m_subscriptionType |
    SubCreateReq::RestartFlag;

  switch (subPtr.p->m_subscriptionType) {
  case SubCreateReq::TableEvent:
  case SubCreateReq::SelectiveTableSnapshot:
  case SubCreateReq::DatabaseSnapshot: {
    jam();
      
    Ptr<SyncRecord> syncPtr;
    suma.c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
    syncPtr.p->m_tableList.first(syncPtr.p->m_tableList_it);

    ndbrequire(!syncPtr.p->m_tableList_it.isNull());

    req->tableId = *syncPtr.p->m_tableList_it.data;
      
#if 0
    for (int i = 0; i < MAX_TABLES; i++)
      if (subPtr.p->m_tables[i]) {
	req->tableId = i;
	break;
      }
#endif

    suma.sendSignal(sumaRef, GSN_SUB_CREATE_REQ, signal,
		    SubCreateReq::SignalLength+1 /*to get table Id*/, JBB);
    return;
  }
  case SubCreateReq::SingleTableScan :
    // TODO
    jam();
    return;
  }
  ndbrequire(false);
}

void 
Suma::execSUB_CREATE_CONF(Signal* signal) {
  jamEntry();
#ifdef NODEFAIL_DEBUG
  ndbout_c("Suma::execSUB_CREATE_CONF");
#endif

  const Uint32 senderRef = signal->senderBlockRef();

  SubCreateConf * const conf = (SubCreateConf *)signal->getDataPtr();

  Subscription key;
  const Uint32 subscriberData = conf->subscriberData;
  key.m_subscriptionId        = conf->subscriptionId;
  key.m_subscriptionKey       = conf->subscriptionKey;
  
  SubscriptionPtr subPtr;
  ndbrequire(c_subscriptions.find(subPtr, key));

  switch(subPtr.p->m_subscriptionType) {
  case SubCreateReq::TableEvent:
  case SubCreateReq::SelectiveTableSnapshot:
  case SubCreateReq::DatabaseSnapshot:
    {
      Ptr<SyncRecord> syncPtr;
      c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);

      syncPtr.p->m_tableList.next(syncPtr.p->m_tableList_it);
      if (syncPtr.p->m_tableList_it.isNull()) {
	jam();
	SubSyncReq *req = (SubSyncReq *)signal->getDataPtrSend();
    
	req->subscriptionId  = key.m_subscriptionId;
	req->subscriptionKey = key.m_subscriptionKey;
	req->subscriberData  = subscriberData;
	req->part            = (Uint32) SubscriptionData::MetaData;

	sendSignal(senderRef, GSN_SUB_SYNC_REQ, signal,
		   SubSyncReq::SignalLength, JBB);
      } else {
	jam();
	SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
      
	req->subscriberRef    = reference();
	req->subscriberData   = subPtr.i;
	req->subscriptionId   = subPtr.p->m_subscriptionId;
	req->subscriptionKey  = subPtr.p->m_subscriptionKey;
	req->subscriptionType = subPtr.p->m_subscriptionType |
	  SubCreateReq::RestartFlag |
	  SubCreateReq::AddTableFlag;

	req->tableId = *syncPtr.p->m_tableList_it.data;

	sendSignal(senderRef, GSN_SUB_CREATE_REQ, signal,
		   SubCreateReq::SignalLength+1 /*to get table Id*/, JBB);
      }
    }
    return;
  case SubCreateReq::SingleTableScan:
    ndbrequire(false);
  }
  ndbrequire(false);
}

void 
Suma::execSUB_CREATE_REF(Signal* signal) {
  jamEntry();
#ifdef NODEFAIL_DEBUG
  ndbout_c("Suma::execSUB_CREATE_REF");
#endif
  //ndbrequire(false);
}

void 
Suma::execSUB_SYNC_CONF(Signal* signal) {
  jamEntry();
#ifdef NODEFAIL_DEBUG
  ndbout_c("Suma::execSUB_SYNC_CONF");
#endif
  Uint32 sumaRef = signal->getSendersBlockRef();

  SubSyncConf *conf = (SubSyncConf *)signal->getDataPtr();
  Subscription key;

  key.m_subscriptionId            = conf->subscriptionId;
  key.m_subscriptionKey           = conf->subscriptionKey;
  //  SubscriptionData::Part part     = (SubscriptionData::Part)conf->part;
  //  const Uint32 subscriberData     = conf->subscriberData;

  SubscriptionPtr subPtr;
  c_subscriptions.find(subPtr, key);

  switch(subPtr.p->m_subscriptionType) {
  case SubCreateReq::TableEvent:
  case SubCreateReq::SelectiveTableSnapshot:
  case SubCreateReq::DatabaseSnapshot:
    jam();
    Restart.nextSubscription(signal, sumaRef);
    return;
  case SubCreateReq::SingleTableScan:
    ndbrequire(false);
    return;
  }
  ndbrequire(false);
}

void 
Suma::execSUB_SYNC_REF(Signal* signal) {
  jamEntry();
#ifdef NODEFAIL_DEBUG
  ndbout_c("Suma::execSUB_SYNC_REF");
#endif
  //ndbrequire(false);
}

void
Suma::execSUMA_START_ME(Signal* signal) {
  jamEntry();
#ifdef NODEFAIL_DEBUG
  ndbout_c("Suma::execSUMA_START_ME");
#endif

  Restart.runSUMA_START_ME(signal, signal->getSendersBlockRef());
}

void
Suma::Restart::runSUMA_START_ME(Signal* signal, Uint32 sumaRef) {
  int I = suma.RtoI(sumaRef);

  // restarting Suma is ready for SUB_START_REQ
  if (c_waitingToStart[I]) {
    // we've waited with startSubscriber since restarting suma was not ready
    c_waitingToStart[I] = false;
    startSubscriber(signal, sumaRef);
  } else {
    // do startSubscriber as soon as its time
    c_okToStart[I] = true;
  }
}

void 
Suma::Restart::completeSubscription(Signal* signal, Uint32 sumaRef) {
  jam();
  int I = suma.RtoI(sumaRef);

  if (c_okToStart[I]) {// otherwise will start when START_ME comes
    c_okToStart[I] = false;
    startSubscriber(signal, sumaRef);
  } else {
    c_waitingToStart[I] = true;
  }
}

void 
Suma::Restart::startSubscriber(Signal* signal, Uint32 sumaRef) {
  jam();
  suma.c_dataSubscribers.first(c_subbPtr);
  nextSubscriber(signal, sumaRef);
}

void
Suma::Restart::sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr,
			       Signal* signal, Uint32 sumaRef)
{
  jam();
  SubStartReq * req = (SubStartReq *)signal->getDataPtrSend();
      
  req->senderRef        = suma.reference();
  req->senderData       = subbPtr.p->m_senderData;
  req->subscriptionId   = subPtr.p->m_subscriptionId;
  req->subscriptionKey  = subPtr.p->m_subscriptionKey;
  req->part             = SubscriptionData::TableData;
  req->subscriberData   = subbPtr.p->m_subscriberData;
  req->subscriberRef    = subbPtr.p->m_subscriberRef;
      
  // restarting suma will not respond to this until startphase 5
  // since it is not until then data copying has been completed
#ifdef NODEFAIL_DEBUG
  ndbout_c("Suma::Restart::sendSubStartReq sending GSN_SUB_START_REQ id=%u key=%u",
	   req->subscriptionId, req->subscriptionKey);
#endif
  suma.sendSignal(sumaRef, GSN_SUB_START_REQ,
		  signal, SubStartReq::SignalLength2, JBB);
}

void 
Suma::execSUB_START_CONF(Signal* signal) {
  jamEntry();
#ifdef NODEFAIL_DEBUG
  ndbout_c("Suma::execSUB_START_CONF");
#endif
  Uint32 sumaRef = signal->getSendersBlockRef();
  Restart.nextSubscriber(signal, sumaRef);
}

void 
Suma::execSUB_START_REF(Signal* signal) {
  jamEntry();
#ifdef NODEFAIL_DEBUG
  ndbout_c("Suma::execSUB_START_REF");
#endif
  //ndbrequire(false);
}

void 
Suma::Restart::nextSubscriber(Signal* signal, Uint32 sumaRef) {
  jam();
  if (c_subbPtr.isNull()) {
    jam();
    completeSubscriber(signal, sumaRef);
    return;
  }
  
  SubscriberPtr subbPtr = c_subbPtr;
  suma.c_dataSubscribers.next(c_subbPtr);

  /*
   * get subscription ptr for this subscriber
   */

  SubscriptionPtr subPtr;
  suma.c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
  switch (subPtr.p->m_subscriptionType) {
  case SubCreateReq::TableEvent:
  case SubCreateReq::SelectiveTableSnapshot:
  case SubCreateReq::DatabaseSnapshot:
    {
      jam();
      sendSubStartReq(subPtr, subbPtr, signal, sumaRef);
#if 0
      SubStartReq * req = (SubStartReq *)signal->getDataPtrSend();
      
      req->senderRef        = reference();
      req->senderData       = subbPtr.p->m_senderData;
      req->subscriptionId   = subPtr.p->m_subscriptionId;
      req->subscriptionKey  = subPtr.p->m_subscriptionKey;
      req->part             = SubscriptionData::TableData;
      req->subscriberData   = subbPtr.p->m_subscriberData;
      req->subscriberRef    = subbPtr.p->m_subscriberRef;
      
      // restarting suma will not respond to this until startphase 5
      // since it is not until then data copying has been completed
#ifdef NODEFAIL_DEBUG
      ndbout_c("Suma::nextSubscriber sending GSN_SUB_START_REQ id=%u key=%u",
	       req->subscriptionId, req->subscriptionKey);
#endif
      suma.sendSignal(sumaRef, GSN_SUB_START_REQ,
		      signal, SubStartReq::SignalLength2, JBB);
#endif
    }
  return;
  case SubCreateReq::SingleTableScan:
    ndbrequire(false);
    return;
  }
  ndbrequire(false);
}

void 
Suma::Restart::completeSubscriber(Signal* signal, Uint32 sumaRef) {
  completeRestartingNode(signal, sumaRef);
}

void
Suma::Restart::completeRestartingNode(Signal* signal, Uint32 sumaRef) {
  jam();
  SumaHandoverReq * req = (SumaHandoverReq *)signal->getDataPtrSend();

  req->gci = suma.getFirstGCI(signal);

  suma.sendSignal(sumaRef, GSN_SUMA_HANDOVER_REQ, signal,
		  SumaHandoverReq::SignalLength, JBB);
}

// only run on restarting suma

void
Suma::execSUMA_HANDOVER_REQ(Signal* signal)
{
  jamEntry();
  //  Uint32 sumaRef = signal->getSendersBlockRef();
  SumaHandoverReq const * req = (SumaHandoverReq *)signal->getDataPtr();

  Uint32 gci = req->gci;
  Uint32 new_gci = getFirstGCI(signal);

  if (new_gci > gci) {
    gci = new_gci;
  }

  { // all recreated subscribers at restarting SUMA start at same GCI
    SubscriberPtr subbPtr;
    for(c_dataSubscribers.first(subbPtr);
	!subbPtr.isNull();
	c_dataSubscribers.next(subbPtr)){
      subbPtr.p->m_firstGCI = gci;
    }
  }

#ifdef NODEFAIL_DEBUG
  ndbout_c("Suma::execSUMA_HANDOVER_REQ, gci = %u", gci);
#endif

  c_handoverToDo = false;
  c_restartLock = false;
  {
#ifdef HANDOVER_DEBUG
    int c = 0;
#endif
    for( int i = 0; i < NO_OF_BUCKETS; i++) {
      jam();
      if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
#ifdef HANDOVER_DEBUG
	c++;
#endif
        jam();
	c_buckets[i].active = false;
	c_buckets[i].handoverGCI = gci;
	c_buckets[i].handover = true;
	c_buckets[i].handover_started = false;
	c_handoverToDo = true;
      }
    }
#ifdef HANDOVER_DEBUG
    ndbout_c("prepared handover of bucket %u buckets", c);
#endif
  }

  for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
    jam();
    Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]);
    if (ref != reference()) {
      jam();
      sendSignal(ref, GSN_SUMA_HANDOVER_CONF, signal,
		 SumaHandoverConf::SignalLength, JBB);
    }//if
  }
}

// only run on all but restarting suma
void
Suma::execSUMA_HANDOVER_CONF(Signal* signal) {
  jamEntry();
  Uint32 sumaRef = signal->getSendersBlockRef();
  SumaHandoverConf const * conf = (SumaHandoverConf *)signal->getDataPtr();

  Uint32 gci = conf->gci;

#ifdef HANDOVER_DEBUG
  ndbout_c("Suma::execSUMA_HANDOVER_CONF, gci = %u", gci);
#endif

  /* TODO, if we are restarting several SUMA's (>2 in a nodegroup)
   * we have to collect all these conf's before proceding
   */

  // restarting node is now prepared and ready
  c_preparingNodes.clear(refToNode(sumaRef)); /* !! important to do before
					       * below since it affects
					       * getResponsibleSumaNodeId()
					       */

  c_handoverToDo = false;
  // mark all active buckets really belonging to restarting SUMA
  for( int i = 0; i < NO_OF_BUCKETS; i++) {
    if (c_buckets[i].active) {
      // I'm running this bucket
      if (getResponsibleSumaNodeId(i) == refToNode(sumaRef)) {
	// but it should really be the restarted node
	c_buckets[i].handoverGCI = gci;
	c_buckets[i].handover = true;
	c_buckets[i].handover_started = false;
	c_handoverToDo = true;
      }
    }
  }
}

template void append(DataBuffer<11>&,SegmentedSectionPtr,SectionSegmentPool&);