Commit becde519 authored by unknown's avatar unknown
Browse files

tux optim 5 - move node ops to class level (prepare to remove node cache)

parent e1179bc7
Loading
Loading
Loading
Loading
+18 −21
Original line number Diff line number Diff line
@@ -505,17 +505,15 @@ private:
  struct NodeHandle;
  friend struct NodeHandle;
  struct NodeHandle {
    Dbtux& m_tux;               // this block
    Frag& m_frag;               // fragment using the node
    TupLoc m_loc;               // physical node address
    TreeNode* m_node;           // pointer to node storage
    AccSize m_acc;              // accessed size
    union {
    Uint32 m_next;              // next active node under fragment
    Uint32 nextPool;
    };
    TreeNode* m_node;           // pointer to node storage
    Uint32 m_cache[MaxTreeNodeSize];
    NodeHandle(Dbtux& tux, Frag& frag);
    NodeHandle(Frag& frag);
    // getters
    TupLoc getLink(unsigned i);
    unsigned getChilds();       // cannot spell
@@ -532,17 +530,8 @@ private:
    void setOccup(unsigned n);
    void setBalance(int b);
    void setNodeScan(Uint32 scanPtrI);
    // operations  XXX maybe these should move to Dbtux level
    void pushUp(Signal* signal, unsigned pos, const TreeEnt& ent);
    void popDown(Signal* signal, unsigned pos, TreeEnt& ent);
    void pushDown(Signal* signal, unsigned pos, TreeEnt& ent);
    void popUp(Signal* signal, unsigned pos, TreeEnt& ent);
    void slide(Signal* signal, Ptr<NodeHandle> nodePtr, unsigned i);
    void linkScan(Dbtux::ScanOpPtr scanPtr);
    void unlinkScan(Dbtux::ScanOpPtr scanPtr);
    bool islinkScan(Dbtux::ScanOpPtr scanPtr);
    // for ndbrequire
    void progError(int line, int cause, const char* extra);
    // for ndbrequire and ndbassert
    void progError(int line, int cause, const char* file);
  };
  typedef Ptr<NodeHandle> NodeHandlePtr;
  ArrayPool<NodeHandle> c_nodeHandlePool;
@@ -656,7 +645,6 @@ private:
  void execTUX_MAINT_REQ(Signal* signal);
  void tupReadAttrs(Signal* signal, const Frag& frag, ReadPar& readPar);
  void tupReadKeys(Signal* signal, const Frag& frag, ReadPar& readPar);
  void tupStoreTh(Signal* signal, const Frag& frag, NodeHandlePtr nodePtr, StorePar storePar);
  
  /*
   * DbtuxNode.cpp
@@ -668,8 +656,18 @@ private:
  void insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc);
  void deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr);
  void accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc);
  void setNodePref(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
  void setNodePref(Signal* signal, NodeHandle& node, unsigned i);
  void commitNodes(Signal* signal, Frag& frag, bool updateOk);
  // node operations
  void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent);
  void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
  void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
  void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
  void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i);
  // scans linked to node
  void linkScan(NodeHandle& node, ScanOpPtr scanPtr);
  void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr);
  bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr);

  /*
   * DbtuxTree.cpp
@@ -1084,13 +1082,12 @@ Dbtux::FragOp::FragOp() :
// Dbtux::NodeHandle

inline
Dbtux::NodeHandle::NodeHandle(Dbtux& tux, Frag& frag) :
  m_tux(tux),
Dbtux::NodeHandle::NodeHandle(Frag& frag) :
  m_frag(frag),
  m_loc(),
  m_node(0),
  m_acc(AccNone),
  m_next(RNIL),
  m_node(0)
  m_next(RNIL)
{
}

+0 −94
Original line number Diff line number Diff line
@@ -270,97 +270,3 @@ Dbtux::tupReadKeys(Signal* signal, const Frag& frag, ReadPar& readPar)
  readPar.m_count = numKeys;
  readPar.m_size = copyPar.m_numwords;
}

/*
 * Operate on index node tuple in TUP.  The data is copied between node
 * cache and index storage via signal data.
 */
void
Dbtux::tupStoreTh(Signal* signal, const Frag& frag, NodeHandlePtr nodePtr, StorePar storePar)
{
  const TreeHead& tree = frag.m_tree;
  // define the direct signal
  TupStoreTh* req = (TupStoreTh*)signal->getDataPtrSend();
  req->errorCode = RNIL;
  req->tableId = frag.m_indexId;
  req->fragId = frag.m_fragId;
  req->fragPtrI = frag.m_tupIndexFragPtrI;
  req->tupAddr = RNIL;  // no longer used
  req->tupVersion = 0;  // no longer used
  req->pageId = nodePtr.p->m_loc.m_pageId;
  req->pageOffset = nodePtr.p->m_loc.m_pageOffset;
  req->bufferId = 0;
  req->opCode = storePar.m_opCode;
  ndbrequire(storePar.m_offset + storePar.m_size <= tree.m_nodeSize);
  req->dataOffset = storePar.m_offset;
  req->dataSize = storePar.m_size;
  // the node cache
  ndbrequire(nodePtr.p->m_node != 0);
  // the buffer in signal data
  Uint32* const buffer = (Uint32*)req + TupStoreTh::SignalLength;
  // copy in data
  switch (storePar.m_opCode) {
  case TupStoreTh::OpRead:
    jam();
 #ifdef VM_TRACE
    {
      Uint32* dst = buffer + storePar.m_offset;
      memset(dst, 0xa9, storePar.m_size << 2);
    }
 #endif
    break;
  case TupStoreTh::OpInsert:
    jam();
    // fallthru
  case TupStoreTh::OpUpdate:
    jam();
    // copy from cache to signal data
    {
      Uint32* dst = buffer + storePar.m_offset;
      const Uint32* src = (const Uint32*)nodePtr.p->m_node + storePar.m_offset;
      memcpy(dst, src, storePar.m_size << 2);
    }
    break;
  case TupStoreTh::OpDelete:
    jam();
    break;
  default:
    ndbrequire(false);
    break;
  }
  // execute
  EXECUTE_DIRECT(DBTUP, GSN_TUP_STORE_TH, signal, TupStoreTh::SignalLength);
  jamEntry();
  if (req->errorCode != 0) {
    jam();
    storePar.m_errorCode = req->errorCode;
    return;
  }
  ndbrequire(req->errorCode == 0);
  // copy out data
  switch (storePar.m_opCode) {
  case TupStoreTh::OpRead:
    jam();
    {
      Uint32* dst = (Uint32*)nodePtr.p->m_node + storePar.m_offset;
      const Uint32* src = (const Uint32*)buffer + storePar.m_offset;
      memcpy(dst, src, storePar.m_size << 2);
    }
    break;
  case TupStoreTh::OpInsert:
    jam();
    nodePtr.p->m_loc.m_pageId = req->pageId;
    nodePtr.p->m_loc.m_pageOffset = req->pageOffset;
    break;
  case TupStoreTh::OpUpdate:
    jam();
    break;
  case TupStoreTh::OpDelete:
    jam();
    nodePtr.p->m_loc = NullTupLoc;
    break;
  default:
    ndbrequire(false);
    break;
  }
}
+111 −105
Original line number Diff line number Diff line
@@ -32,7 +32,7 @@ Dbtux::seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr)
    jam();
    return;
  }
  new (nodePtr.p) NodeHandle(*this, frag);
  new (nodePtr.p) NodeHandle(frag);
  nodePtr.p->m_next = frag.m_nodeList;
  frag.m_nodeList = nodePtr.i;
}
@@ -158,7 +158,7 @@ Dbtux::deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr)
  // invalidate handle and storage
  tmpPtr.p->m_loc = NullTupLoc;
  tmpPtr.p->m_node = 0;
  // scans have already been moved by popDown or popUp
  // scans have already been moved by nodePopDown or nodePopUp
}

/*
@@ -179,8 +179,9 @@ Dbtux::accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize ac
 * Set prefix.
 */
void
Dbtux::setNodePref(Signal* signal, Frag& frag, NodeHandle& node, unsigned i)
Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i)
{
  Frag& frag = node.m_frag;
  TreeHead& tree = frag.m_tree;
  ReadPar readPar;
  ndbrequire(i <= 1);
@@ -219,7 +220,7 @@ Dbtux::commitNodes(Signal* signal, Frag& frag, bool updateOk)
  }
}

// Dbtux::NodeHandle
// node operations

/*
 * Add entry at position.  Move entries greater than or equal to the old
@@ -231,25 +232,26 @@ Dbtux::commitNodes(Signal* signal, Frag& frag, bool updateOk)
 *      0 1 2 3 4 5 6      0 1 2 3 4 5 6
 */
void
Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent)
Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent)
{
  TreeHead& tree = m_frag.m_tree;
  const unsigned occup = getOccup();
  Frag& frag = node.m_frag;
  TreeHead& tree = frag.m_tree;
  const unsigned occup = node.getOccup();
  ndbrequire(occup < tree.m_maxOccup && pos <= occup);
  // fix scans
  ScanOpPtr scanPtr;
  scanPtr.i = getNodeScan();
  scanPtr.i = node.getNodeScan();
  while (scanPtr.i != RNIL) {
    jam();
    m_tux.c_scanOpPool.getPtr(scanPtr);
    c_scanOpPool.getPtr(scanPtr);
    TreePos& scanPos = scanPtr.p->m_scanPos;
    ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
    ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
    if (scanPos.m_pos >= pos) {
      jam();
#ifdef VM_TRACE
      if (m_tux.debugFlags & m_tux.DebugScan) {
        m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
        m_tux.debugOut << "At pushUp pos=" << pos << " " << *this << endl;
      if (debugFlags & DebugScan) {
        debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
        debugOut << "At pushUp pos=" << pos << " " << node << endl;
      }
#endif
      scanPos.m_pos++;
@@ -257,7 +259,7 @@ Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent)
    scanPtr.i = scanPtr.p->m_nodeScan;
  }
  // fix node
  TreeEnt* const entList = tree.getEntList(m_node);
  TreeEnt* const entList = tree.getEntList(node.m_node);
  entList[occup] = entList[0];
  TreeEnt* const tmpList = entList + 1;
  for (unsigned i = occup; i > pos; i--) {
@@ -266,17 +268,17 @@ Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent)
  }
  tmpList[pos] = ent;
  entList[0] = entList[occup + 1];
  setOccup(occup + 1);
  node.setOccup(occup + 1);
  // fix prefixes
  if (occup == 0 || pos == 0)
    m_tux.setNodePref(signal, m_frag, *this, 0);
    setNodePref(signal, node, 0);
  if (occup == 0 || pos == occup)
    m_tux.setNodePref(signal, m_frag, *this, 1);
    setNodePref(signal, node, 1);
}

/*
 * Remove and return entry at position.  Move entries greater than the
 * removed one to the left.  This is the opposite of pushUp.
 * removed one to the left.  This is the opposite of nodePushUp.
 *
 *                               D
 *            ^                  ^
@@ -284,46 +286,47 @@ Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent)
 *      0 1 2 3 4 5 6      0 1 2 3 4 5 6
 */
void
Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent)
Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
{
  TreeHead& tree = m_frag.m_tree;
  const unsigned occup = getOccup();
  Frag& frag = node.m_frag;
  TreeHead& tree = frag.m_tree;
  const unsigned occup = node.getOccup();
  ndbrequire(occup <= tree.m_maxOccup && pos < occup);
  ScanOpPtr scanPtr;
  // move scans whose entry disappears
  scanPtr.i = getNodeScan();
  scanPtr.i = node.getNodeScan();
  while (scanPtr.i != RNIL) {
    jam();
    m_tux.c_scanOpPool.getPtr(scanPtr);
    c_scanOpPool.getPtr(scanPtr);
    TreePos& scanPos = scanPtr.p->m_scanPos;
    ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
    ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
    const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
    if (scanPos.m_pos == pos) {
      jam();
#ifdef VM_TRACE
      if (m_tux.debugFlags & m_tux.DebugScan) {
        m_tux.debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
        m_tux.debugOut << "At popDown pos=" << pos << " " << *this << endl;
      if (debugFlags & DebugScan) {
        debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
        debugOut << "At popDown pos=" << pos << " " << node << endl;
      }
#endif
      m_tux.scanNext(signal, scanPtr);
      scanNext(signal, scanPtr);
    }
    scanPtr.i = nextPtrI;
  }
  // fix other scans
  scanPtr.i = getNodeScan();
  scanPtr.i = node.getNodeScan();
  while (scanPtr.i != RNIL) {
    jam();
    m_tux.c_scanOpPool.getPtr(scanPtr);
    c_scanOpPool.getPtr(scanPtr);
    TreePos& scanPos = scanPtr.p->m_scanPos;
    ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
    ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
    ndbrequire(scanPos.m_pos != pos);
    if (scanPos.m_pos > pos) {
      jam();
#ifdef VM_TRACE
      if (m_tux.debugFlags & m_tux.DebugScan) {
        m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
        m_tux.debugOut << "At popDown pos=" << pos << " " << *this << endl;
      if (debugFlags & DebugScan) {
        debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
        debugOut << "At popDown pos=" << pos << " " << node << endl;
      }
#endif
      scanPos.m_pos--;
@@ -331,7 +334,7 @@ Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent)
    scanPtr.i = scanPtr.p->m_nodeScan;
  }
  // fix node
  TreeEnt* const entList = tree.getEntList(m_node);
  TreeEnt* const entList = tree.getEntList(node.m_node);
  entList[occup] = entList[0];
  TreeEnt* const tmpList = entList + 1;
  ent = tmpList[pos];
@@ -340,12 +343,12 @@ Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent)
    tmpList[i] = tmpList[i + 1];
  }
  entList[0] = entList[occup - 1];
  setOccup(occup - 1);
  node.setOccup(occup - 1);
  // fix prefixes
  if (occup != 1 && pos == 0)
    m_tux.setNodePref(signal, m_frag, *this, 0);
    setNodePref(signal, node, 0);
  if (occup != 1 && pos == occup - 1)
    m_tux.setNodePref(signal, m_frag, *this, 1);
    setNodePref(signal, node, 1);
}

/*
@@ -358,47 +361,48 @@ Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent)
 *      0 1 2 3 4 5 6      0 1 2 3 4 5 6
 */
void
Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent)
Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
{
  TreeHead& tree = m_frag.m_tree;
  const unsigned occup = getOccup();
  Frag& frag = node.m_frag;
  TreeHead& tree = frag.m_tree;
  const unsigned occup = node.getOccup();
  ndbrequire(occup <= tree.m_maxOccup && pos < occup);
  ScanOpPtr scanPtr;
  // move scans whose entry disappears
  scanPtr.i = getNodeScan();
  scanPtr.i = node.getNodeScan();
  while (scanPtr.i != RNIL) {
    jam();
    m_tux.c_scanOpPool.getPtr(scanPtr);
    c_scanOpPool.getPtr(scanPtr);
    TreePos& scanPos = scanPtr.p->m_scanPos;
    ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
    ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
    const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
    if (scanPos.m_pos == 0) {
      jam();
#ifdef VM_TRACE
      if (m_tux.debugFlags & m_tux.DebugScan) {
        m_tux.debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
        m_tux.debugOut << "At pushDown pos=" << pos << " " << *this << endl;
      if (debugFlags & DebugScan) {
        debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
        debugOut << "At pushDown pos=" << pos << " " << node << endl;
      }
#endif
      // here we may miss a valid entry "X"  XXX known bug
      m_tux.scanNext(signal, scanPtr);
      scanNext(signal, scanPtr);
    }
    scanPtr.i = nextPtrI;
  }
  // fix other scans
  scanPtr.i = getNodeScan();
  scanPtr.i = node.getNodeScan();
  while (scanPtr.i != RNIL) {
    jam();
    m_tux.c_scanOpPool.getPtr(scanPtr);
    c_scanOpPool.getPtr(scanPtr);
    TreePos& scanPos = scanPtr.p->m_scanPos;
    ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
    ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
    ndbrequire(scanPos.m_pos != 0);
    if (scanPos.m_pos <= pos) {
      jam();
#ifdef VM_TRACE
      if (m_tux.debugFlags & m_tux.DebugScan) {
        m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
        m_tux.debugOut << "At pushDown pos=" << pos << " " << *this << endl;
      if (debugFlags & DebugScan) {
        debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
        debugOut << "At pushDown pos=" << pos << " " << node << endl;
      }
#endif
      scanPos.m_pos--;
@@ -406,7 +410,7 @@ Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent)
    scanPtr.i = scanPtr.p->m_nodeScan;
  }
  // fix node
  TreeEnt* const entList = tree.getEntList(m_node);
  TreeEnt* const entList = tree.getEntList(node.m_node);
  entList[occup] = entList[0];
  TreeEnt* const tmpList = entList + 1;
  TreeEnt oldMin = tmpList[0];
@@ -419,15 +423,15 @@ Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent)
  entList[0] = entList[occup];
  // fix prefixes
  if (true)
    m_tux.setNodePref(signal, m_frag, *this, 0);
    setNodePref(signal, node, 0);
  if (occup == 1 || pos == occup - 1)
    m_tux.setNodePref(signal, m_frag, *this, 1);
    setNodePref(signal, node, 1);
}

/*
 * Remove and return entry at position.  Move entries less than the
 * removed one to the right.  Replace min entry by the input entry.
 * This is the opposite of pushDown.
 * This is the opposite of nodePushDown.
 *
 *      X                        D
 *      v     ^                  ^
@@ -435,47 +439,48 @@ Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent)
 *      0 1 2 3 4 5 6      0 1 2 3 4 5 6
 */
void
Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent)
Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
{
  TreeHead& tree = m_frag.m_tree;
  const unsigned occup = getOccup();
  Frag& frag = node.m_frag;
  TreeHead& tree = frag.m_tree;
  const unsigned occup = node.getOccup();
  ndbrequire(occup <= tree.m_maxOccup && pos < occup);
  ScanOpPtr scanPtr;
  // move scans whose entry disappears
  scanPtr.i = getNodeScan();
  scanPtr.i = node.getNodeScan();
  while (scanPtr.i != RNIL) {
    jam();
    m_tux.c_scanOpPool.getPtr(scanPtr);
    c_scanOpPool.getPtr(scanPtr);
    TreePos& scanPos = scanPtr.p->m_scanPos;
    ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
    ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
    const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
    if (scanPos.m_pos == pos) {
      jam();
#ifdef VM_TRACE
      if (m_tux.debugFlags & m_tux.DebugScan) {
        m_tux.debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
        m_tux.debugOut << "At popUp pos=" << pos << " " << *this << endl;
      if (debugFlags & DebugScan) {
        debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
        debugOut << "At popUp pos=" << pos << " " << node << endl;
      }
#endif
      // here we may miss a valid entry "X"  XXX known bug
      m_tux.scanNext(signal, scanPtr);
      scanNext(signal, scanPtr);
    }
    scanPtr.i = nextPtrI;
  }
  // fix other scans
  scanPtr.i = getNodeScan();
  scanPtr.i = node.getNodeScan();
  while (scanPtr.i != RNIL) {
    jam();
    m_tux.c_scanOpPool.getPtr(scanPtr);
    c_scanOpPool.getPtr(scanPtr);
    TreePos& scanPos = scanPtr.p->m_scanPos;
    ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
    ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
    ndbrequire(scanPos.m_pos != pos);
    if (scanPos.m_pos < pos) {
      jam();
#ifdef VM_TRACE
      if (m_tux.debugFlags & m_tux.DebugScan) {
        m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
        m_tux.debugOut << "At popUp pos=" << pos << " " << *this << endl;
      if (debugFlags & DebugScan) {
        debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
        debugOut << "At popUp pos=" << pos << " " << node << endl;
      }
#endif
      scanPos.m_pos++;
@@ -483,7 +488,7 @@ Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent)
    scanPtr.i = scanPtr.p->m_nodeScan;
  }
  // fix node
  TreeEnt* const entList = tree.getEntList(m_node);
  TreeEnt* const entList = tree.getEntList(node.m_node);
  entList[occup] = entList[0];
  TreeEnt* const tmpList = entList + 1;
  TreeEnt newMin = ent;
@@ -496,9 +501,9 @@ Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent)
  entList[0] = entList[occup];
  // fix prefixes
  if (true)
    m_tux.setNodePref(signal, m_frag, *this, 0);
    setNodePref(signal, node, 0);
  if (occup == 1 || pos == occup - 1)
    m_tux.setNodePref(signal, m_frag, *this, 1);
    setNodePref(signal, node, 1);
}

/*
@@ -506,14 +511,15 @@ Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent)
 * after the max (i=1).  XXX can be optimized
 */
void
Dbtux::NodeHandle::slide(Signal* signal, NodeHandlePtr nodePtr, unsigned i)
Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i)
{
  Frag& frag = dstNode.m_frag;
  TreeHead& tree = frag.m_tree;
  ndbrequire(i <= 1);
  TreeHead& tree = m_frag.m_tree;
  while (getOccup() < tree.m_maxOccup && nodePtr.p->getOccup() != 0) {
  while (dstNode.getOccup() < tree.m_maxOccup && srcNode.getOccup() != 0) {
    TreeEnt ent;
    nodePtr.p->popDown(signal, i == 0 ? nodePtr.p->getOccup() - 1 : 0, ent);
    pushUp(signal, i == 0 ? 0 : getOccup(), ent);
    nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent);
    nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent);
  }
}

@@ -522,50 +528,50 @@ Dbtux::NodeHandle::slide(Signal* signal, NodeHandlePtr nodePtr, unsigned i)
 * ordering does not matter.
 */
void
Dbtux::NodeHandle::linkScan(Dbtux::ScanOpPtr scanPtr)
Dbtux::linkScan(NodeHandle& node, ScanOpPtr scanPtr)
{
#ifdef VM_TRACE
  if (m_tux.debugFlags & m_tux.DebugScan) {
    m_tux.debugOut << "Link scan " << scanPtr.i << " " << *scanPtr.p << endl;
    m_tux.debugOut << "To node " << *this << endl;
  if (debugFlags & DebugScan) {
    debugOut << "Link scan " << scanPtr.i << " " << *scanPtr.p << endl;
    debugOut << "To node " << node << endl;
  }
#endif
  ndbrequire(! islinkScan(scanPtr) && scanPtr.p->m_nodeScan == RNIL);
  scanPtr.p->m_nodeScan = getNodeScan();
  setNodeScan(scanPtr.i);
  ndbrequire(! islinkScan(node, scanPtr) && scanPtr.p->m_nodeScan == RNIL);
  scanPtr.p->m_nodeScan = node.getNodeScan();
  node.setNodeScan(scanPtr.i);
}

/*
 * Unlink a scan from the list under the node.
 */
void
Dbtux::NodeHandle::unlinkScan(Dbtux::ScanOpPtr scanPtr)
Dbtux::unlinkScan(NodeHandle& node, ScanOpPtr scanPtr)
{
#ifdef VM_TRACE
  if (m_tux.debugFlags & m_tux.DebugScan) {
    m_tux.debugOut << "Unlink scan " << scanPtr.i << " " << *scanPtr.p << endl;
    m_tux.debugOut << "From node " << *this << endl;
  if (debugFlags & DebugScan) {
    debugOut << "Unlink scan " << scanPtr.i << " " << *scanPtr.p << endl;
    debugOut << "From node " << node << endl;
  }
#endif
  Dbtux::ScanOpPtr currPtr;
  currPtr.i = getNodeScan();
  Dbtux::ScanOpPtr prevPtr;
  ScanOpPtr currPtr;
  currPtr.i = node.getNodeScan();
  ScanOpPtr prevPtr;
  prevPtr.i = RNIL;
  while (true) {
    jam();
    m_tux.c_scanOpPool.getPtr(currPtr);
    c_scanOpPool.getPtr(currPtr);
    Uint32 nextPtrI = currPtr.p->m_nodeScan;
    if (currPtr.i == scanPtr.i) {
      jam();
      if (prevPtr.i == RNIL) {
        setNodeScan(nextPtrI);
        node.setNodeScan(nextPtrI);
      } else {
        jam();
        prevPtr.p->m_nodeScan = nextPtrI;
      }
      scanPtr.p->m_nodeScan = RNIL;
      // check for duplicates
      ndbrequire(! islinkScan(scanPtr));
      ndbrequire(! islinkScan(node, scanPtr));
      return;
    }
    prevPtr = currPtr;
@@ -577,13 +583,13 @@ Dbtux::NodeHandle::unlinkScan(Dbtux::ScanOpPtr scanPtr)
 * Check if a scan is linked to this node.  Only for ndbrequire.
 */
bool
Dbtux::NodeHandle::islinkScan(Dbtux::ScanOpPtr scanPtr)
Dbtux::islinkScan(NodeHandle& node, ScanOpPtr scanPtr)
{
  Dbtux::ScanOpPtr currPtr;
  currPtr.i = getNodeScan();
  ScanOpPtr currPtr;
  currPtr.i = node.getNodeScan();
  while (currPtr.i != RNIL) {
    jam();
    m_tux.c_scanOpPool.getPtr(currPtr);
    c_scanOpPool.getPtr(currPtr);
    if (currPtr.i == scanPtr.i) {
      jam();
      return true;
@@ -594,7 +600,7 @@ Dbtux::NodeHandle::islinkScan(Dbtux::ScanOpPtr scanPtr)
}

void
Dbtux::NodeHandle::progError(int line, int cause, const char* extra)
Dbtux::NodeHandle::progError(int line, int cause, const char* file)
{
  m_tux.progError(line, cause, extra);
  ErrorReporter::handleAssert("Dbtux::NodeHandle: assert failed", file, line);
}
+8 −8
Original line number Diff line number Diff line
@@ -280,7 +280,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
      const TupLoc loc = scan.m_scanPos.m_loc;
      NodeHandlePtr nodePtr;
      selectNode(signal, frag, nodePtr, loc, AccHead);
      nodePtr.p->unlinkScan(scanPtr);
      unlinkScan(*nodePtr.p, scanPtr);
      scan.m_scanPos.m_loc = NullTupLoc;
    }
    if (scan.m_lockwait) {
@@ -763,7 +763,7 @@ loop: {
        pos.m_dir = 3;
        scan.m_scanPos = pos;
        scan.m_state = ScanOp::Next;
        nodePtr.p->linkScan(scanPtr);
        linkScan(*nodePtr.p, scanPtr);
        return;
      }
      if (i == 1 && ret > 0) {
@@ -779,7 +779,7 @@ loop: {
        pos.m_dir = 1;
        scan.m_scanPos = pos;
        scan.m_state = ScanOp::Next;
        nodePtr.p->linkScan(scanPtr);
        linkScan(*nodePtr.p, scanPtr);
        return;
      }
    }
@@ -808,7 +808,7 @@ loop: {
        pos.m_dir = 3;
        scan.m_scanPos = pos;
        scan.m_state = ScanOp::Next;
        nodePtr.p->linkScan(scanPtr);
        linkScan(*nodePtr.p, scanPtr);
        return;
      }
    }
@@ -870,7 +870,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
  // get and remember original node
  NodeHandlePtr origNodePtr;
  selectNode(signal, frag, origNodePtr, pos.m_loc, AccHead);
  ndbrequire(origNodePtr.p->islinkScan(scanPtr));
  ndbrequire(islinkScan(*origNodePtr.p, scanPtr));
  // current node in loop
  NodeHandlePtr nodePtr = origNodePtr;
  while (true) {
@@ -977,13 +977,13 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
    ndbrequire(pos.m_loc == nodePtr.p->m_loc);
    if (origNodePtr.i != nodePtr.i) {
      jam();
      origNodePtr.p->unlinkScan(scanPtr);
      nodePtr.p->linkScan(scanPtr);
      unlinkScan(*origNodePtr.p, scanPtr);
      linkScan(*nodePtr.p, scanPtr);
    }
  } else if (scan.m_state == ScanOp::Last) {
    jam();
    ndbrequire(pos.m_loc == NullTupLoc);
    origNodePtr.p->unlinkScan(scanPtr);
    unlinkScan(*origNodePtr.p, scanPtr);
  } else {
    ndbrequire(false);
  }
+11 −11

File changed.

Preview size limit exceeded, changes collapsed.

Loading