Loading ndb/src/kernel/blocks/dbtux/Dbtux.hpp +22 −17 Original line number Diff line number Diff line Loading @@ -23,6 +23,7 @@ #include <AttributeHeader.hpp> #include <ArrayPool.hpp> #include <DataBuffer.hpp> #include <DLFifoList.hpp> #include <md5_hash.hpp> // big brother Loading Loading @@ -334,6 +335,18 @@ private: typedef DataBuffer<ScanBoundSegmentSize>::DataBufferPool ScanBoundPool; ScanBoundPool c_scanBoundPool; // ScanLock struct ScanLock { Uint32 m_accLockOp; union { Uint32 nextPool; Uint32 nextList; }; Uint32 prevList; }; typedef Ptr<ScanLock> ScanLockPtr; ArrayPool<ScanLock> c_scanLockPool; /* * Scan operation. * Loading Loading @@ -379,6 +392,8 @@ private: Uint32 m_savePointId; // lock waited for or obtained and not yet passed to LQH Uint32 m_accLockOp; // locks obtained and passed to LQH but not yet returned by LQH DLFifoList<ScanLock>::Head m_accLockOps; Uint8 m_readCommitted; // no locking Uint8 m_lockMode; Uint8 m_descending; Loading @@ -394,13 +409,6 @@ private: Uint32 nextList; }; Uint32 prevList; /* * Locks obtained and passed to LQH but not yet returned by LQH. * The max was increased from 16 to 992 (default 64). Record max * ever used in this scan. TODO fix quadratic behaviour */ Uint32 m_maxAccLockOps; Uint32 m_accLockOps[MaxAccLockOps]; ScanOp(ScanBoundPool& scanBoundPool); }; typedef Ptr<ScanOp> ScanOpPtr; Loading Loading @@ -620,8 +628,9 @@ private: bool scanCheck(ScanOpPtr scanPtr, TreeEnt ent); bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent); void scanClose(Signal* signal, ScanOpPtr scanPtr); void addAccLockOp(ScanOp& scan, Uint32 accLockOp); void removeAccLockOp(ScanOp& scan, Uint32 accLockOp); void abortAccLockOps(Signal* signal, ScanOpPtr scanPtr); void addAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp); void removeAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp); void releaseScanOp(ScanOpPtr& scanPtr); /* Loading Loading @@ -674,7 +683,8 @@ private: DebugMeta = 1, // log create and drop index DebugMaint = 2, // log maintenance ops DebugTree = 4, // log and check tree after each op DebugScan = 8 // log scans DebugScan = 8, // log scans DebugLock = 16 // log ACC locks }; STATIC_CONST( DataFillByte = 0xa2 ); STATIC_CONST( NodeFillByte = 0xa4 ); Loading Loading @@ -938,6 +948,7 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) : m_transId2(0), m_savePointId(0), m_accLockOp(RNIL), m_accLockOps(), m_readCommitted(0), m_lockMode(0), m_descending(0), Loading @@ -945,18 +956,12 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) : m_boundMax(scanBoundPool), m_scanPos(), m_scanEnt(), m_nodeScan(RNIL), m_maxAccLockOps(0) m_nodeScan(RNIL) { m_bound[0] = &m_boundMin; m_bound[1] = &m_boundMax; m_boundCnt[0] = 0; m_boundCnt[1] = 0; #ifdef VM_TRACE for (unsigned i = 0; i < MaxAccLockOps; i++) { m_accLockOps[i] = 0x1f1f1f1f; } #endif } // Dbtux::Index Loading ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp +11 −5 Original line number Diff line number Diff line Loading @@ -330,6 +330,7 @@ operator<<(NdbOut& out, const Dbtux::DescAttr& descAttr) NdbOut& operator<<(NdbOut& out, const Dbtux::ScanOp& scan) { Dbtux* tux = (Dbtux*)globalData.getBlock(DBTUX); out << "[ScanOp " << hex << &scan; out << " [state " << dec << scan.m_state << "]"; out << " [lockwait " << dec << scan.m_lockwait << "]"; Loading @@ -339,9 +340,15 @@ operator<<(NdbOut& out, const Dbtux::ScanOp& scan) out << " [savePointId " << dec << scan.m_savePointId << "]"; out << " [accLockOp " << hex << scan.m_accLockOp << "]"; out << " [accLockOps"; for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { if (scan.m_accLockOps[i] != RNIL) out << " " << hex << scan.m_accLockOps[i]; { DLFifoList<Dbtux::ScanLock>::Head head = scan.m_accLockOps; LocalDLFifoList<Dbtux::ScanLock> list(tux->c_scanLockPool, head); Dbtux::ScanLockPtr lockPtr; list.first(lockPtr); while (lockPtr.i != RNIL) { out << " " << hex << lockPtr.p->m_accLockOp; list.next(lockPtr); } } out << "]"; out << " [readCommitted " << dec << scan.m_readCommitted << "]"; Loading @@ -367,13 +374,12 @@ operator<<(NdbOut& out, const Dbtux::ScanOp& scan) NdbOut& operator<<(NdbOut& out, const Dbtux::Index& index) { Dbtux* tux = (Dbtux*)globalData.getBlock(DBTUX); out << "[Index " << hex << &index; out << " [tableId " << dec << index.m_tableId << "]"; out << " [numFrags " << dec << index.m_numFrags << "]"; for (unsigned i = 0; i < index.m_numFrags; i++) { out << " [frag " << dec << i << " "; // dangerous and wrong Dbtux* tux = (Dbtux*)globalData.getBlock(DBTUX); const Dbtux::Frag& frag = *tux->c_fragPool.getPtr(index.m_fragPtrI[i]); out << frag; out << "]"; Loading ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp +4 −0 Original line number Diff line number Diff line Loading @@ -159,6 +159,7 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) Uint32 nFragment; Uint32 nAttribute; Uint32 nScanOp; Uint32 nScanBatch; const ndb_mgm_configuration_iterator * p = theConfiguration.getOwnConfigIterator(); Loading @@ -168,9 +169,11 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_FRAGMENT, &nFragment)); ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_ATTRIBUTE, &nAttribute)); ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_SCAN_OP, &nScanOp)); ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_BATCH_SIZE, &nScanBatch)); const Uint32 nDescPage = (nIndex * DescHeadSize + nAttribute * DescAttrSize + DescPageSize - 1) / DescPageSize; const Uint32 nScanBoundWords = nScanOp * ScanBoundSegmentSize * 4; const Uint32 nScanLock = nScanOp * nScanBatch; c_indexPool.setSize(nIndex); c_fragPool.setSize(nFragment); Loading @@ -178,6 +181,7 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) c_fragOpPool.setSize(MaxIndexFragments); c_scanOpPool.setSize(nScanOp); c_scanBoundPool.setSize(nScanBoundWords); c_scanLockPool.setSize(nScanLock); /* * Index id is physical array index. We seize and initialize all * index records now. Loading ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp +69 −46 Original line number Diff line number Diff line Loading @@ -312,7 +312,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); removeAccLockOp(scan, req->accOperationPtr); removeAccLockOp(scanPtr, req->accOperationPtr); } if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) { jam(); Loading Loading @@ -466,7 +466,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) scan.m_state = ScanOp::Locked; scan.m_accLockOp = lockReq->accOpPtr; #ifdef VM_TRACE if (debugFlags & DebugScan) { if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Lock immediate scan " << scanPtr.i << " " << scan << endl; } #endif Loading @@ -478,7 +478,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) scan.m_lockwait = true; scan.m_accLockOp = lockReq->accOpPtr; #ifdef VM_TRACE if (debugFlags & DebugScan) { if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Lock wait scan " << scanPtr.i << " " << scan << endl; } #endif Loading Loading @@ -534,7 +534,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) if (accLockOp != RNIL) { scan.m_accLockOp = RNIL; // remember it until LQH unlocks it addAccLockOp(scan, accLockOp); addAccLockOp(scanPtr, accLockOp); } else { ndbrequire(scan.m_readCommitted); // operation RNIL in LQH would signal no tuple returned Loading Loading @@ -589,7 +589,7 @@ Dbtux::execACCKEYCONF(Signal* signal) c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE if (debugFlags & DebugScan) { if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Lock obtained scan " << scanPtr.i << " " << scan << endl; } #endif Loading Loading @@ -634,7 +634,7 @@ Dbtux::execACCKEYREF(Signal* signal) c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE if (debugFlags & DebugScan) { if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Lock refused scan " << scanPtr.i << " " << scan << endl; } #endif Loading Loading @@ -678,7 +678,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal) c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE if (debugFlags & DebugScan) { if (debugFlags & (DebugScan | DebugLock)) { debugOut << "ACC_ABORTCONF scan " << scanPtr.i << " " << scan << endl; } #endif Loading Loading @@ -1014,18 +1014,9 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr) ScanOp& scan = *scanPtr.p; ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL); // unlock all not unlocked by LQH for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { if (scan.m_accLockOps[i] != RNIL) { if (! scan.m_accLockOps.isEmpty()) { jam(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::Abort; lockReq->accOpPtr = scan.m_accLockOps[i]; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_accLockOps[i] = RNIL; } abortAccLockOps(signal, scanPtr); } // send conf NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); Loading @@ -1039,44 +1030,76 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr) } void Dbtux::addAccLockOp(ScanOp& scan, Uint32 accLockOp) Dbtux::abortAccLockOps(Signal* signal, ScanOpPtr scanPtr) { ndbrequire(accLockOp != RNIL); Uint32* list = scan.m_accLockOps; bool ok = false; for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { ndbrequire(list[i] != accLockOp); if (! ok && list[i] == RNIL) { list[i] = accLockOp; ok = true; // continue check for duplicates ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Abort locks in scan " << scanPtr.i << " " << scan << endl; } #endif LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps); ScanLockPtr lockPtr; while (list.first(lockPtr)) { jam(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::Abort; lockReq->accOpPtr = lockPtr.p->m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); list.release(lockPtr); } if (! ok) { unsigned i = scan.m_maxAccLockOps; if (i < MaxAccLockOps) { list[i] = accLockOp; ok = true; scan.m_maxAccLockOps = i + 1; } void Dbtux::addAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp) { ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Add lock " << hex << accLockOp << dec << " to scan " << scanPtr.i << " " << scan << endl; } #endif LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps); ScanLockPtr lockPtr; #ifdef VM_TRACE list.first(lockPtr); while (lockPtr.i != RNIL) { ndbrequire(lockPtr.p->m_accLockOp != accLockOp); list.next(lockPtr); } #endif bool ok = list.seize(lockPtr); ndbrequire(ok); ndbrequire(accLockOp != RNIL); lockPtr.p->m_accLockOp = accLockOp; } void Dbtux::removeAccLockOp(ScanOp& scan, Uint32 accLockOp) Dbtux::removeAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp) { ndbrequire(accLockOp != RNIL); Uint32* list = scan.m_accLockOps; bool ok = false; for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { if (list[i] == accLockOp) { list[i] = RNIL; ok = true; ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Remove lock " << hex << accLockOp << dec << " from scan " << scanPtr.i << " " << scan << endl; } #endif LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps); ScanLockPtr lockPtr; list.first(lockPtr); while (lockPtr.i != RNIL) { if (lockPtr.p->m_accLockOp == accLockOp) { jam(); break; } list.next(lockPtr); } ndbrequire(ok); ndbrequire(lockPtr.i != RNIL); list.release(lockPtr); } /* Loading ndb/src/kernel/vm/DLFifoList.hpp +2 −0 Original line number Diff line number Diff line Loading @@ -34,6 +34,8 @@ public: Head(); Uint32 firstItem; Uint32 lastItem; inline bool isEmpty() const { return firstItem == RNIL;} }; DLFifoList(ArrayPool<T> & thePool); Loading Loading
ndb/src/kernel/blocks/dbtux/Dbtux.hpp +22 −17 Original line number Diff line number Diff line Loading @@ -23,6 +23,7 @@ #include <AttributeHeader.hpp> #include <ArrayPool.hpp> #include <DataBuffer.hpp> #include <DLFifoList.hpp> #include <md5_hash.hpp> // big brother Loading Loading @@ -334,6 +335,18 @@ private: typedef DataBuffer<ScanBoundSegmentSize>::DataBufferPool ScanBoundPool; ScanBoundPool c_scanBoundPool; // ScanLock struct ScanLock { Uint32 m_accLockOp; union { Uint32 nextPool; Uint32 nextList; }; Uint32 prevList; }; typedef Ptr<ScanLock> ScanLockPtr; ArrayPool<ScanLock> c_scanLockPool; /* * Scan operation. * Loading Loading @@ -379,6 +392,8 @@ private: Uint32 m_savePointId; // lock waited for or obtained and not yet passed to LQH Uint32 m_accLockOp; // locks obtained and passed to LQH but not yet returned by LQH DLFifoList<ScanLock>::Head m_accLockOps; Uint8 m_readCommitted; // no locking Uint8 m_lockMode; Uint8 m_descending; Loading @@ -394,13 +409,6 @@ private: Uint32 nextList; }; Uint32 prevList; /* * Locks obtained and passed to LQH but not yet returned by LQH. * The max was increased from 16 to 992 (default 64). Record max * ever used in this scan. TODO fix quadratic behaviour */ Uint32 m_maxAccLockOps; Uint32 m_accLockOps[MaxAccLockOps]; ScanOp(ScanBoundPool& scanBoundPool); }; typedef Ptr<ScanOp> ScanOpPtr; Loading Loading @@ -620,8 +628,9 @@ private: bool scanCheck(ScanOpPtr scanPtr, TreeEnt ent); bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent); void scanClose(Signal* signal, ScanOpPtr scanPtr); void addAccLockOp(ScanOp& scan, Uint32 accLockOp); void removeAccLockOp(ScanOp& scan, Uint32 accLockOp); void abortAccLockOps(Signal* signal, ScanOpPtr scanPtr); void addAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp); void removeAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp); void releaseScanOp(ScanOpPtr& scanPtr); /* Loading Loading @@ -674,7 +683,8 @@ private: DebugMeta = 1, // log create and drop index DebugMaint = 2, // log maintenance ops DebugTree = 4, // log and check tree after each op DebugScan = 8 // log scans DebugScan = 8, // log scans DebugLock = 16 // log ACC locks }; STATIC_CONST( DataFillByte = 0xa2 ); STATIC_CONST( NodeFillByte = 0xa4 ); Loading Loading @@ -938,6 +948,7 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) : m_transId2(0), m_savePointId(0), m_accLockOp(RNIL), m_accLockOps(), m_readCommitted(0), m_lockMode(0), m_descending(0), Loading @@ -945,18 +956,12 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) : m_boundMax(scanBoundPool), m_scanPos(), m_scanEnt(), m_nodeScan(RNIL), m_maxAccLockOps(0) m_nodeScan(RNIL) { m_bound[0] = &m_boundMin; m_bound[1] = &m_boundMax; m_boundCnt[0] = 0; m_boundCnt[1] = 0; #ifdef VM_TRACE for (unsigned i = 0; i < MaxAccLockOps; i++) { m_accLockOps[i] = 0x1f1f1f1f; } #endif } // Dbtux::Index Loading
ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp +11 −5 Original line number Diff line number Diff line Loading @@ -330,6 +330,7 @@ operator<<(NdbOut& out, const Dbtux::DescAttr& descAttr) NdbOut& operator<<(NdbOut& out, const Dbtux::ScanOp& scan) { Dbtux* tux = (Dbtux*)globalData.getBlock(DBTUX); out << "[ScanOp " << hex << &scan; out << " [state " << dec << scan.m_state << "]"; out << " [lockwait " << dec << scan.m_lockwait << "]"; Loading @@ -339,9 +340,15 @@ operator<<(NdbOut& out, const Dbtux::ScanOp& scan) out << " [savePointId " << dec << scan.m_savePointId << "]"; out << " [accLockOp " << hex << scan.m_accLockOp << "]"; out << " [accLockOps"; for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { if (scan.m_accLockOps[i] != RNIL) out << " " << hex << scan.m_accLockOps[i]; { DLFifoList<Dbtux::ScanLock>::Head head = scan.m_accLockOps; LocalDLFifoList<Dbtux::ScanLock> list(tux->c_scanLockPool, head); Dbtux::ScanLockPtr lockPtr; list.first(lockPtr); while (lockPtr.i != RNIL) { out << " " << hex << lockPtr.p->m_accLockOp; list.next(lockPtr); } } out << "]"; out << " [readCommitted " << dec << scan.m_readCommitted << "]"; Loading @@ -367,13 +374,12 @@ operator<<(NdbOut& out, const Dbtux::ScanOp& scan) NdbOut& operator<<(NdbOut& out, const Dbtux::Index& index) { Dbtux* tux = (Dbtux*)globalData.getBlock(DBTUX); out << "[Index " << hex << &index; out << " [tableId " << dec << index.m_tableId << "]"; out << " [numFrags " << dec << index.m_numFrags << "]"; for (unsigned i = 0; i < index.m_numFrags; i++) { out << " [frag " << dec << i << " "; // dangerous and wrong Dbtux* tux = (Dbtux*)globalData.getBlock(DBTUX); const Dbtux::Frag& frag = *tux->c_fragPool.getPtr(index.m_fragPtrI[i]); out << frag; out << "]"; Loading
ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp +4 −0 Original line number Diff line number Diff line Loading @@ -159,6 +159,7 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) Uint32 nFragment; Uint32 nAttribute; Uint32 nScanOp; Uint32 nScanBatch; const ndb_mgm_configuration_iterator * p = theConfiguration.getOwnConfigIterator(); Loading @@ -168,9 +169,11 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_FRAGMENT, &nFragment)); ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_ATTRIBUTE, &nAttribute)); ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_SCAN_OP, &nScanOp)); ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_BATCH_SIZE, &nScanBatch)); const Uint32 nDescPage = (nIndex * DescHeadSize + nAttribute * DescAttrSize + DescPageSize - 1) / DescPageSize; const Uint32 nScanBoundWords = nScanOp * ScanBoundSegmentSize * 4; const Uint32 nScanLock = nScanOp * nScanBatch; c_indexPool.setSize(nIndex); c_fragPool.setSize(nFragment); Loading @@ -178,6 +181,7 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) c_fragOpPool.setSize(MaxIndexFragments); c_scanOpPool.setSize(nScanOp); c_scanBoundPool.setSize(nScanBoundWords); c_scanLockPool.setSize(nScanLock); /* * Index id is physical array index. We seize and initialize all * index records now. Loading
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp +69 −46 Original line number Diff line number Diff line Loading @@ -312,7 +312,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); removeAccLockOp(scan, req->accOperationPtr); removeAccLockOp(scanPtr, req->accOperationPtr); } if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) { jam(); Loading Loading @@ -466,7 +466,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) scan.m_state = ScanOp::Locked; scan.m_accLockOp = lockReq->accOpPtr; #ifdef VM_TRACE if (debugFlags & DebugScan) { if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Lock immediate scan " << scanPtr.i << " " << scan << endl; } #endif Loading @@ -478,7 +478,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) scan.m_lockwait = true; scan.m_accLockOp = lockReq->accOpPtr; #ifdef VM_TRACE if (debugFlags & DebugScan) { if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Lock wait scan " << scanPtr.i << " " << scan << endl; } #endif Loading Loading @@ -534,7 +534,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) if (accLockOp != RNIL) { scan.m_accLockOp = RNIL; // remember it until LQH unlocks it addAccLockOp(scan, accLockOp); addAccLockOp(scanPtr, accLockOp); } else { ndbrequire(scan.m_readCommitted); // operation RNIL in LQH would signal no tuple returned Loading Loading @@ -589,7 +589,7 @@ Dbtux::execACCKEYCONF(Signal* signal) c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE if (debugFlags & DebugScan) { if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Lock obtained scan " << scanPtr.i << " " << scan << endl; } #endif Loading Loading @@ -634,7 +634,7 @@ Dbtux::execACCKEYREF(Signal* signal) c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE if (debugFlags & DebugScan) { if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Lock refused scan " << scanPtr.i << " " << scan << endl; } #endif Loading Loading @@ -678,7 +678,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal) c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE if (debugFlags & DebugScan) { if (debugFlags & (DebugScan | DebugLock)) { debugOut << "ACC_ABORTCONF scan " << scanPtr.i << " " << scan << endl; } #endif Loading Loading @@ -1014,18 +1014,9 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr) ScanOp& scan = *scanPtr.p; ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL); // unlock all not unlocked by LQH for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { if (scan.m_accLockOps[i] != RNIL) { if (! scan.m_accLockOps.isEmpty()) { jam(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::Abort; lockReq->accOpPtr = scan.m_accLockOps[i]; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_accLockOps[i] = RNIL; } abortAccLockOps(signal, scanPtr); } // send conf NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); Loading @@ -1039,44 +1030,76 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr) } void Dbtux::addAccLockOp(ScanOp& scan, Uint32 accLockOp) Dbtux::abortAccLockOps(Signal* signal, ScanOpPtr scanPtr) { ndbrequire(accLockOp != RNIL); Uint32* list = scan.m_accLockOps; bool ok = false; for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { ndbrequire(list[i] != accLockOp); if (! ok && list[i] == RNIL) { list[i] = accLockOp; ok = true; // continue check for duplicates ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Abort locks in scan " << scanPtr.i << " " << scan << endl; } #endif LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps); ScanLockPtr lockPtr; while (list.first(lockPtr)) { jam(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::Abort; lockReq->accOpPtr = lockPtr.p->m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); list.release(lockPtr); } if (! ok) { unsigned i = scan.m_maxAccLockOps; if (i < MaxAccLockOps) { list[i] = accLockOp; ok = true; scan.m_maxAccLockOps = i + 1; } void Dbtux::addAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp) { ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Add lock " << hex << accLockOp << dec << " to scan " << scanPtr.i << " " << scan << endl; } #endif LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps); ScanLockPtr lockPtr; #ifdef VM_TRACE list.first(lockPtr); while (lockPtr.i != RNIL) { ndbrequire(lockPtr.p->m_accLockOp != accLockOp); list.next(lockPtr); } #endif bool ok = list.seize(lockPtr); ndbrequire(ok); ndbrequire(accLockOp != RNIL); lockPtr.p->m_accLockOp = accLockOp; } void Dbtux::removeAccLockOp(ScanOp& scan, Uint32 accLockOp) Dbtux::removeAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp) { ndbrequire(accLockOp != RNIL); Uint32* list = scan.m_accLockOps; bool ok = false; for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { if (list[i] == accLockOp) { list[i] = RNIL; ok = true; ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Remove lock " << hex << accLockOp << dec << " from scan " << scanPtr.i << " " << scan << endl; } #endif LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps); ScanLockPtr lockPtr; list.first(lockPtr); while (lockPtr.i != RNIL) { if (lockPtr.p->m_accLockOp == accLockOp) { jam(); break; } list.next(lockPtr); } ndbrequire(ok); ndbrequire(lockPtr.i != RNIL); list.release(lockPtr); } /* Loading
ndb/src/kernel/vm/DLFifoList.hpp +2 −0 Original line number Diff line number Diff line Loading @@ -34,6 +34,8 @@ public: Head(); Uint32 firstItem; Uint32 lastItem; inline bool isEmpty() const { return firstItem == RNIL;} }; DLFifoList(ArrayPool<T> & thePool); Loading