Loading ndb/src/kernel/blocks/dblqh/Dblqh.hpp +5 −0 Original line number Diff line number Diff line Loading @@ -550,6 +550,11 @@ public: UintR scanErrorCounter; UintR scanLocalFragid; UintR scanSchemaVersion; /** * This is _always_ main table, even in range scan * in which case scanTcrec->fragmentptr is different */ Uint32 fragPtrI; UintR scanStoredProcId; ScanState scanState; Loading ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +21 −20 Original line number Diff line number Diff line Loading @@ -7703,6 +7703,9 @@ void Dblqh::abort_scan(Signal* signal, Uint32 scan_ptr_i, Uint32 errcode){ jam(); scanptr.i = scan_ptr_i; c_scanRecordPool.getPtr(scanptr); fragptr.i = tcConnectptr.p->fragmentptr; ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord); finishScanrec(signal); releaseScanrec(signal); tcConnectptr.p->transactionState = TcConnectionrec::IDLE; Loading Loading @@ -8570,10 +8573,12 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) /** * Used for scan take over */ { FragrecordPtr tFragPtr; tFragPtr.i = fragptr.p->tableFragptr; ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord); scanptr.p->fragPtrI = fragptr.p->tableFragptr; } /** * !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11 Loading @@ -8582,7 +8587,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) Uint32 start = (idx ? MAX_PARALLEL_SCANS_PER_FRAG : 1 ); Uint32 stop = (idx ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1); stop += start; Uint32 free = tFragPtr.p->m_scanNumberMask.find(start); Uint32 free = fragptr.p->m_scanNumberMask.find(start); if(free == Fragrecord::ScanNumberMask::NotFound || free >= stop){ jam(); Loading @@ -8597,16 +8602,16 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) */ scanptr.p->scanState = ScanRecord::IN_QUEUE; LocalDLFifoList<ScanRecord> queue(c_scanRecordPool, tFragPtr.p->m_queuedScans); fragptr.p->m_queuedScans); queue.add(scanptr); return ZOK; } scanptr.p->scanNumber = free; tFragPtr.p->m_scanNumberMask.clear(free);// Update mask fragptr.p->m_scanNumberMask.clear(free);// Update mask LocalDLList<ScanRecord> active(c_scanRecordPool, tFragPtr.p->m_activeScans); LocalDLList<ScanRecord> active(c_scanRecordPool, fragptr.p->m_activeScans); active.add(scanptr); if(scanptr.p->scanKeyinfoFlag){ jam(); Loading Loading @@ -8666,12 +8671,8 @@ void Dblqh::finishScanrec(Signal* signal) { release_acc_ptr_list(scanptr.p); FragrecordPtr tFragPtr; tFragPtr.i = scanptr.p->fragPtrI; ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord); LocalDLFifoList<ScanRecord> queue(c_scanRecordPool, tFragPtr.p->m_queuedScans); fragptr.p->m_queuedScans); if(scanptr.p->scanState == ScanRecord::IN_QUEUE){ jam(); Loading @@ -8689,11 +8690,11 @@ void Dblqh::finishScanrec(Signal* signal) ndbrequire(tmp.p == scanptr.p); } LocalDLList<ScanRecord> scans(c_scanRecordPool, tFragPtr.p->m_activeScans); LocalDLList<ScanRecord> scans(c_scanRecordPool, fragptr.p->m_activeScans); scans.release(scanptr); const Uint32 scanNumber = scanptr.p->scanNumber; ndbrequire(!tFragPtr.p->m_scanNumberMask.get(scanNumber)); ndbrequire(!fragptr.p->m_scanNumberMask.get(scanNumber)); ScanRecordPtr restart; /** Loading @@ -8701,13 +8702,13 @@ void Dblqh::finishScanrec(Signal* signal) */ if(scanNumber == NR_ScanNo || !queue.first(restart)){ jam(); tFragPtr.p->m_scanNumberMask.set(scanNumber); fragptr.p->m_scanNumberMask.set(scanNumber); return; } if(ERROR_INSERTED(5034)){ jam(); tFragPtr.p->m_scanNumberMask.set(scanNumber); fragptr.p->m_scanNumberMask.set(scanNumber); return; } Loading ndb/src/ndbapi/NdbConnection.cpp +5 −2 Original line number Diff line number Diff line Loading @@ -1122,8 +1122,11 @@ NdbConnection::getNdbIndexScanOperation(const NdbIndexImpl* index, if (indexTable != 0){ NdbIndexScanOperation* tOp = getNdbScanOperation((NdbTableImpl *) indexTable); if(tOp) { tOp->m_currentTable = table; if(tOp) tOp->m_cursor_type = NdbScanOperation::IndexCursor; tOp->m_cursor_type = NdbScanOperation::IndexCursor; } return tOp; } else { setOperationErrorCodeAbort(theNdb->theError.code); Loading Loading
ndb/src/kernel/blocks/dblqh/Dblqh.hpp +5 −0 Original line number Diff line number Diff line Loading @@ -550,6 +550,11 @@ public: UintR scanErrorCounter; UintR scanLocalFragid; UintR scanSchemaVersion; /** * This is _always_ main table, even in range scan * in which case scanTcrec->fragmentptr is different */ Uint32 fragPtrI; UintR scanStoredProcId; ScanState scanState; Loading
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +21 −20 Original line number Diff line number Diff line Loading @@ -7703,6 +7703,9 @@ void Dblqh::abort_scan(Signal* signal, Uint32 scan_ptr_i, Uint32 errcode){ jam(); scanptr.i = scan_ptr_i; c_scanRecordPool.getPtr(scanptr); fragptr.i = tcConnectptr.p->fragmentptr; ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord); finishScanrec(signal); releaseScanrec(signal); tcConnectptr.p->transactionState = TcConnectionrec::IDLE; Loading Loading @@ -8570,10 +8573,12 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) /** * Used for scan take over */ { FragrecordPtr tFragPtr; tFragPtr.i = fragptr.p->tableFragptr; ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord); scanptr.p->fragPtrI = fragptr.p->tableFragptr; } /** * !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11 Loading @@ -8582,7 +8587,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) Uint32 start = (idx ? MAX_PARALLEL_SCANS_PER_FRAG : 1 ); Uint32 stop = (idx ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1); stop += start; Uint32 free = tFragPtr.p->m_scanNumberMask.find(start); Uint32 free = fragptr.p->m_scanNumberMask.find(start); if(free == Fragrecord::ScanNumberMask::NotFound || free >= stop){ jam(); Loading @@ -8597,16 +8602,16 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) */ scanptr.p->scanState = ScanRecord::IN_QUEUE; LocalDLFifoList<ScanRecord> queue(c_scanRecordPool, tFragPtr.p->m_queuedScans); fragptr.p->m_queuedScans); queue.add(scanptr); return ZOK; } scanptr.p->scanNumber = free; tFragPtr.p->m_scanNumberMask.clear(free);// Update mask fragptr.p->m_scanNumberMask.clear(free);// Update mask LocalDLList<ScanRecord> active(c_scanRecordPool, tFragPtr.p->m_activeScans); LocalDLList<ScanRecord> active(c_scanRecordPool, fragptr.p->m_activeScans); active.add(scanptr); if(scanptr.p->scanKeyinfoFlag){ jam(); Loading Loading @@ -8666,12 +8671,8 @@ void Dblqh::finishScanrec(Signal* signal) { release_acc_ptr_list(scanptr.p); FragrecordPtr tFragPtr; tFragPtr.i = scanptr.p->fragPtrI; ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord); LocalDLFifoList<ScanRecord> queue(c_scanRecordPool, tFragPtr.p->m_queuedScans); fragptr.p->m_queuedScans); if(scanptr.p->scanState == ScanRecord::IN_QUEUE){ jam(); Loading @@ -8689,11 +8690,11 @@ void Dblqh::finishScanrec(Signal* signal) ndbrequire(tmp.p == scanptr.p); } LocalDLList<ScanRecord> scans(c_scanRecordPool, tFragPtr.p->m_activeScans); LocalDLList<ScanRecord> scans(c_scanRecordPool, fragptr.p->m_activeScans); scans.release(scanptr); const Uint32 scanNumber = scanptr.p->scanNumber; ndbrequire(!tFragPtr.p->m_scanNumberMask.get(scanNumber)); ndbrequire(!fragptr.p->m_scanNumberMask.get(scanNumber)); ScanRecordPtr restart; /** Loading @@ -8701,13 +8702,13 @@ void Dblqh::finishScanrec(Signal* signal) */ if(scanNumber == NR_ScanNo || !queue.first(restart)){ jam(); tFragPtr.p->m_scanNumberMask.set(scanNumber); fragptr.p->m_scanNumberMask.set(scanNumber); return; } if(ERROR_INSERTED(5034)){ jam(); tFragPtr.p->m_scanNumberMask.set(scanNumber); fragptr.p->m_scanNumberMask.set(scanNumber); return; } Loading
ndb/src/ndbapi/NdbConnection.cpp +5 −2 Original line number Diff line number Diff line Loading @@ -1122,8 +1122,11 @@ NdbConnection::getNdbIndexScanOperation(const NdbIndexImpl* index, if (indexTable != 0){ NdbIndexScanOperation* tOp = getNdbScanOperation((NdbTableImpl *) indexTable); if(tOp) { tOp->m_currentTable = table; if(tOp) tOp->m_cursor_type = NdbScanOperation::IndexCursor; tOp->m_cursor_type = NdbScanOperation::IndexCursor; } return tOp; } else { setOperationErrorCodeAbort(theNdb->theError.code); Loading