/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #define DBTUX_NODE_CPP #include "Dbtux.hpp" /* * Node handles. * * Temporary version between "cache" and "pointer" implementations. */ // Dbtux void Dbtux::seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr) { if (! c_nodeHandlePool.seize(nodePtr)) { jam(); return; } new (nodePtr.p) NodeHandle(frag); nodePtr.p->m_next = frag.m_nodeList; frag.m_nodeList = nodePtr.i; } void Dbtux::preallocNode(Signal* signal, Frag& frag, Uint32& errorCode) { ndbrequire(frag.m_nodeFree == RNIL); NodeHandlePtr nodePtr; seizeNode(signal, frag, nodePtr); ndbrequire(nodePtr.i != RNIL); // remove from cache XXX ugly frag.m_nodeFree = frag.m_nodeList; frag.m_nodeList = nodePtr.p->m_next; // alloc index node in TUP Uint32 pageId = NullTupLoc.m_pageId; Uint32 pageOffset = NullTupLoc.m_pageOffset; Uint32* node32 = 0; errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); if (errorCode != 0) { jam(); c_nodeHandlePool.release(nodePtr); frag.m_nodeFree = RNIL; return; } nodePtr.p->m_loc = TupLoc(pageId, pageOffset); nodePtr.p->m_node = reinterpret_cast<TreeNode*>(node32); ndbrequire(nodePtr.p->m_loc != NullTupLoc && nodePtr.p->m_node != 0); new (nodePtr.p->m_node) TreeNode(); #ifdef VM_TRACE TreeHead& tree = frag.m_tree; TreeNode* node = nodePtr.p->m_node; memset(tree.getPref(node, 0), 0xa2, tree.m_prefSize << 2); memset(tree.getPref(node, 1), 0xa2, tree.m_prefSize << 2); TreeEnt* entList = tree.getEntList(node); memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2)); #endif } /* * Find node in the cache. XXX too slow, use direct links instead */ void Dbtux::findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc) { NodeHandlePtr tmpPtr; tmpPtr.i = frag.m_nodeList; while (tmpPtr.i != RNIL) { jam(); c_nodeHandlePool.getPtr(tmpPtr); if (tmpPtr.p->m_loc == loc) { jam(); nodePtr = tmpPtr; return; } tmpPtr.i = tmpPtr.p->m_next; } nodePtr.i = RNIL; nodePtr.p = 0; } /* * Get handle for existing node. */ void Dbtux::selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc, AccSize acc) { ndbrequire(loc != NullTupLoc && acc > AccNone); NodeHandlePtr tmpPtr; // search in cache findNode(signal, frag, tmpPtr, loc); if (tmpPtr.i == RNIL) { jam(); // add new node seizeNode(signal, frag, tmpPtr); ndbrequire(tmpPtr.i != RNIL); tmpPtr.p->m_loc = loc; Uint32 pageId = loc.m_pageId; Uint32 pageOffset = loc.m_pageOffset; Uint32* node32 = 0; c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); tmpPtr.p->m_node = reinterpret_cast<TreeNode*>(node32); ndbrequire(tmpPtr.p->m_loc != NullTupLoc && tmpPtr.p->m_node != 0); } if (tmpPtr.p->m_acc < acc) { jam(); accessNode(signal, frag, tmpPtr, acc); } nodePtr = tmpPtr; } /* * Create new node in the cache using the pre-allocated node. */ void Dbtux::insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc) { ndbrequire(acc > AccNone); NodeHandlePtr tmpPtr; // use the pre-allocated node tmpPtr.i = frag.m_nodeFree; frag.m_nodeFree = RNIL; c_nodeHandlePool.getPtr(tmpPtr); // move it to the cache tmpPtr.p->m_next = frag.m_nodeList; frag.m_nodeList = tmpPtr.i; tmpPtr.p->m_acc = acc; nodePtr = tmpPtr; } /* * Delete existing node. */ void Dbtux::deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr) { NodeHandlePtr tmpPtr = nodePtr; ndbrequire(tmpPtr.p->getOccup() == 0); Uint32 pageId = tmpPtr.p->m_loc.m_pageId; Uint32 pageOffset = tmpPtr.p->m_loc.m_pageOffset; Uint32* node32 = reinterpret_cast<Uint32*>(tmpPtr.p->m_node); c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); // invalidate handle and storage tmpPtr.p->m_loc = NullTupLoc; tmpPtr.p->m_node = 0; // scans have already been moved by nodePopDown or nodePopUp } /* * Access more of the node. */ void Dbtux::accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc) { NodeHandlePtr tmpPtr = nodePtr; ndbrequire(tmpPtr.p->m_loc != NullTupLoc && tmpPtr.p->m_node != 0); if (tmpPtr.p->m_acc >= acc) return; // XXX could do prefetch tmpPtr.p->m_acc = acc; } /* * Set prefix. */ void Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i) { Frag& frag = node.m_frag; TreeHead& tree = frag.m_tree; ReadPar readPar; ndbrequire(i <= 1); readPar.m_ent = node.getMinMax(i); readPar.m_first = 0; readPar.m_count = frag.m_numAttrs; // leave in signal data readPar.m_data = 0; // XXX implement max words to read tupReadAttrs(signal, frag, readPar); // copy whatever fits CopyPar copyPar; copyPar.m_items = readPar.m_count; copyPar.m_headers = true; copyPar.m_maxwords = tree.m_prefSize; Data pref = node.getPref(i); copyAttrs(pref, readPar.m_data, copyPar); } /* * Commit and release nodes at the end of an operation. Used also on * error since no changes have been made (updateOk false). */ void Dbtux::commitNodes(Signal* signal, Frag& frag, bool updateOk) { NodeHandlePtr nodePtr; nodePtr.i = frag.m_nodeList; frag.m_nodeList = RNIL; while (nodePtr.i != RNIL) { c_nodeHandlePool.getPtr(nodePtr); // release NodeHandlePtr tmpPtr = nodePtr; nodePtr.i = nodePtr.p->m_next; c_nodeHandlePool.release(tmpPtr); } } // node operations /* * Add entry at position. Move entries greater than or equal to the old * one (if any) to the right. * * X * v * A B C D E _ _ => A B C X D E _ * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 */ void Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent) { Frag& frag = node.m_frag; TreeHead& tree = frag.m_tree; const unsigned occup = node.getOccup(); ndbrequire(occup < tree.m_maxOccup && pos <= occup); // fix scans ScanOpPtr scanPtr; scanPtr.i = node.getNodeScan(); while (scanPtr.i != RNIL) { jam(); c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); if (scanPos.m_pos >= pos) { jam(); #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; debugOut << "At pushUp pos=" << pos << " " << node << endl; } #endif scanPos.m_pos++; } scanPtr.i = scanPtr.p->m_nodeScan; } // fix node TreeEnt* const entList = tree.getEntList(node.m_node); entList[occup] = entList[0]; TreeEnt* const tmpList = entList + 1; for (unsigned i = occup; i > pos; i--) { jam(); tmpList[i] = tmpList[i - 1]; } tmpList[pos] = ent; entList[0] = entList[occup + 1]; node.setOccup(occup + 1); // fix prefixes if (occup == 0 || pos == 0) setNodePref(signal, node, 0); if (occup == 0 || pos == occup) setNodePref(signal, node, 1); } /* * Remove and return entry at position. Move entries greater than the * removed one to the left. This is the opposite of nodePushUp. * * D * ^ ^ * A B C D E F _ => A B C E F _ _ * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 */ void Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) { Frag& frag = node.m_frag; TreeHead& tree = frag.m_tree; const unsigned occup = node.getOccup(); ndbrequire(occup <= tree.m_maxOccup && pos < occup); ScanOpPtr scanPtr; // move scans whose entry disappears scanPtr.i = node.getNodeScan(); while (scanPtr.i != RNIL) { jam(); c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); const Uint32 nextPtrI = scanPtr.p->m_nodeScan; if (scanPos.m_pos == pos) { jam(); #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; debugOut << "At popDown pos=" << pos << " " << node << endl; } #endif scanNext(signal, scanPtr); } scanPtr.i = nextPtrI; } // fix other scans scanPtr.i = node.getNodeScan(); while (scanPtr.i != RNIL) { jam(); c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); ndbrequire(scanPos.m_pos != pos); if (scanPos.m_pos > pos) { jam(); #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; debugOut << "At popDown pos=" << pos << " " << node << endl; } #endif scanPos.m_pos--; } scanPtr.i = scanPtr.p->m_nodeScan; } // fix node TreeEnt* const entList = tree.getEntList(node.m_node); entList[occup] = entList[0]; TreeEnt* const tmpList = entList + 1; ent = tmpList[pos]; for (unsigned i = pos; i < occup - 1; i++) { jam(); tmpList[i] = tmpList[i + 1]; } entList[0] = entList[occup - 1]; node.setOccup(occup - 1); // fix prefixes if (occup != 1 && pos == 0) setNodePref(signal, node, 0); if (occup != 1 && pos == occup - 1) setNodePref(signal, node, 1); } /* * Add entry at existing position. Move entries less than or equal to * the old one to the left. Remove and return old min entry. * * X A * ^ v ^ * A B C D E _ _ => B C D X E _ _ * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 */ void Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) { Frag& frag = node.m_frag; TreeHead& tree = frag.m_tree; const unsigned occup = node.getOccup(); ndbrequire(occup <= tree.m_maxOccup && pos < occup); ScanOpPtr scanPtr; // move scans whose entry disappears scanPtr.i = node.getNodeScan(); while (scanPtr.i != RNIL) { jam(); c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); const Uint32 nextPtrI = scanPtr.p->m_nodeScan; if (scanPos.m_pos == 0) { jam(); #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; debugOut << "At pushDown pos=" << pos << " " << node << endl; } #endif // here we may miss a valid entry "X" XXX known bug scanNext(signal, scanPtr); } scanPtr.i = nextPtrI; } // fix other scans scanPtr.i = node.getNodeScan(); while (scanPtr.i != RNIL) { jam(); c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); ndbrequire(scanPos.m_pos != 0); if (scanPos.m_pos <= pos) { jam(); #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; debugOut << "At pushDown pos=" << pos << " " << node << endl; } #endif scanPos.m_pos--; } scanPtr.i = scanPtr.p->m_nodeScan; } // fix node TreeEnt* const entList = tree.getEntList(node.m_node); entList[occup] = entList[0]; TreeEnt* const tmpList = entList + 1; TreeEnt oldMin = tmpList[0]; for (unsigned i = 0; i < pos; i++) { jam(); tmpList[i] = tmpList[i + 1]; } tmpList[pos] = ent; ent = oldMin; entList[0] = entList[occup]; // fix prefixes if (true) setNodePref(signal, node, 0); if (occup == 1 || pos == occup - 1) setNodePref(signal, node, 1); } /* * Remove and return entry at position. Move entries less than the * removed one to the right. Replace min entry by the input entry. * This is the opposite of nodePushDown. * * X D * v ^ ^ * A B C D E _ _ => X A B C E _ _ * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 */ void Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) { Frag& frag = node.m_frag; TreeHead& tree = frag.m_tree; const unsigned occup = node.getOccup(); ndbrequire(occup <= tree.m_maxOccup && pos < occup); ScanOpPtr scanPtr; // move scans whose entry disappears scanPtr.i = node.getNodeScan(); while (scanPtr.i != RNIL) { jam(); c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); const Uint32 nextPtrI = scanPtr.p->m_nodeScan; if (scanPos.m_pos == pos) { jam(); #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; debugOut << "At popUp pos=" << pos << " " << node << endl; } #endif // here we may miss a valid entry "X" XXX known bug scanNext(signal, scanPtr); } scanPtr.i = nextPtrI; } // fix other scans scanPtr.i = node.getNodeScan(); while (scanPtr.i != RNIL) { jam(); c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); ndbrequire(scanPos.m_pos != pos); if (scanPos.m_pos < pos) { jam(); #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; debugOut << "At popUp pos=" << pos << " " << node << endl; } #endif scanPos.m_pos++; } scanPtr.i = scanPtr.p->m_nodeScan; } // fix node TreeEnt* const entList = tree.getEntList(node.m_node); entList[occup] = entList[0]; TreeEnt* const tmpList = entList + 1; TreeEnt newMin = ent; ent = tmpList[pos]; for (unsigned i = pos; i > 0; i--) { jam(); tmpList[i] = tmpList[i - 1]; } tmpList[0] = newMin; entList[0] = entList[occup]; // fix prefixes if (true) setNodePref(signal, node, 0); if (occup == 1 || pos == occup - 1) setNodePref(signal, node, 1); } /* * Move all possible entries from another node before the min (i=0) or * after the max (i=1). XXX can be optimized */ void Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i) { Frag& frag = dstNode.m_frag; TreeHead& tree = frag.m_tree; ndbrequire(i <= 1); while (dstNode.getOccup() < tree.m_maxOccup && srcNode.getOccup() != 0) { TreeEnt ent; nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent); nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent); } } /* * Link scan to the list under the node. The list is single-linked and * ordering does not matter. */ void Dbtux::linkScan(NodeHandle& node, ScanOpPtr scanPtr) { #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Link scan " << scanPtr.i << " " << *scanPtr.p << endl; debugOut << "To node " << node << endl; } #endif ndbrequire(! islinkScan(node, scanPtr) && scanPtr.p->m_nodeScan == RNIL); scanPtr.p->m_nodeScan = node.getNodeScan(); node.setNodeScan(scanPtr.i); } /* * Unlink a scan from the list under the node. */ void Dbtux::unlinkScan(NodeHandle& node, ScanOpPtr scanPtr) { #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "Unlink scan " << scanPtr.i << " " << *scanPtr.p << endl; debugOut << "From node " << node << endl; } #endif ScanOpPtr currPtr; currPtr.i = node.getNodeScan(); ScanOpPtr prevPtr; prevPtr.i = RNIL; while (true) { jam(); c_scanOpPool.getPtr(currPtr); Uint32 nextPtrI = currPtr.p->m_nodeScan; if (currPtr.i == scanPtr.i) { jam(); if (prevPtr.i == RNIL) { node.setNodeScan(nextPtrI); } else { jam(); prevPtr.p->m_nodeScan = nextPtrI; } scanPtr.p->m_nodeScan = RNIL; // check for duplicates ndbrequire(! islinkScan(node, scanPtr)); return; } prevPtr = currPtr; currPtr.i = nextPtrI; } } /* * Check if a scan is linked to this node. Only for ndbrequire. */ bool Dbtux::islinkScan(NodeHandle& node, ScanOpPtr scanPtr) { ScanOpPtr currPtr; currPtr.i = node.getNodeScan(); while (currPtr.i != RNIL) { jam(); c_scanOpPool.getPtr(currPtr); if (currPtr.i == scanPtr.i) { jam(); return true; } currPtr.i = currPtr.p->m_nodeScan; } return false; } void Dbtux::NodeHandle::progError(int line, int cause, const char* file) { ErrorReporter::handleAssert("Dbtux::NodeHandle: assert failed", file, line); }