Loading ndb/src/kernel/blocks/dblqh/Dblqh.hpp +19 −0 Original line number Diff line number Diff line Loading @@ -2925,4 +2925,23 @@ Dblqh::ScanRecord::check_scan_batch_completed() const (max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes)); } inline void Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index) { if (index == 0) { acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0]; } else { Uint32 attr_buf_index, attr_buf_rec; AttrbufPtr regAttrPtr; jam(); attr_buf_rec= (index + 31) / 32; attr_buf_index= (index - 1) & 31; regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec]; ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf); acc_ptr= (Uint32*)®AttrPtr.p->attrbuf[attr_buf_index]; } } #endif ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +27 −27 Original line number Diff line number Diff line Loading @@ -7058,10 +7058,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal) // Update timer on tcConnectRecord tcConnectptr.p->tcTimer = cLqhTimeOutCount; init_acc_ptr_list(scanptr.p); scanptr.p->m_curr_batch_size_rows = 0; scanptr.p->m_curr_batch_size_bytes= 0; scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT; scanNextLoopLab(signal); }//Dblqh::continueScanNextReqLab() Loading Loading @@ -7260,22 +7257,32 @@ void Dblqh::scanLockReleasedLab(Signal* signal) tcConnectptr.i = scanptr.p->scanTcrec; ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); releaseActiveFrag(signal); if (scanptr.p->scanReleaseCounter == scanptr.p->m_curr_batch_size_rows) { if ((scanptr.p->scanErrorCounter > 0) || (scanptr.p->scanCompletedStatus == ZTRUE)) { jam(); scanptr.p->m_curr_batch_size_rows = 0; scanptr.p->m_curr_batch_size_bytes = 0; closeScanLab(signal); } else if (scanptr.p->check_scan_batch_completed() && scanptr.p->scanLockHold != ZTRUE) { jam(); scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ; sendScanFragConf(signal, ZFALSE); } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) { jam(); closeScanLab(signal); return; } else { jam(); /* We came here after releasing locks after receiving SCAN_NEXTREQ from TC. We only come here when scanHoldLock == ZTRUE * We came here after releasing locks after * receiving SCAN_NEXTREQ from TC. We only come here * when scanHoldLock == ZTRUE */ scanptr.p->m_curr_batch_size_rows = 0; scanptr.p->m_curr_batch_size_bytes = 0; continueScanNextReqLab(signal); }//if } else if (scanptr.p->scanReleaseCounter < scanptr.p->m_curr_batch_size_rows) { Loading Loading @@ -7362,25 +7369,6 @@ Dblqh::init_acc_ptr_list(ScanRecord* scanP) scanP->scan_acc_index = 0; } inline void Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index) { if (index == 0) { acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0]; } else { Uint32 attr_buf_index, attr_buf_rec; AttrbufPtr regAttrPtr; jam(); attr_buf_rec= (index + 31) / 32; attr_buf_index= (index - 1) & 31; regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec]; ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf); acc_ptr= (Uint32*)®AttrPtr.p->attrbuf[attr_buf_index]; } } Uint32 Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP, Uint32 index, Loading Loading @@ -7904,6 +7892,13 @@ void Dblqh::nextScanConfScanLab(Signal* signal) /************************************************************* * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. ************************************************************ */ if (!scanptr.p->scanLockHold) { jam(); closeScanLab(signal); return; } if (scanptr.p->scanCompletedStatus == ZTRUE) { if ((scanptr.p->scanLockHold == ZTRUE) && (scanptr.p->m_curr_batch_size_rows > 0)) { Loading Loading @@ -8404,8 +8399,6 @@ void Dblqh::tupScanCloseConfLab(Signal* signal) ScanFragRef::SignalLength, JBB); } else { jam(); scanptr.p->m_curr_batch_size_rows = 0; scanptr.p->m_curr_batch_size_bytes= 0; sendScanFragConf(signal, ZSCAN_FRAG_CLOSED); }//if finishScanrec(signal); Loading Loading @@ -8809,6 +8802,13 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted) conf->total_len= total_len; sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF, signal, ScanFragConf::SignalLength, JBB); if(!scanptr.p->scanLockHold) { jam(); scanptr.p->m_curr_batch_size_rows = 0; scanptr.p->m_curr_batch_size_bytes= 0; } }//Dblqh::sendScanFragConf() /* ######################################################################### */ Loading ndb/src/kernel/blocks/dbtc/Dbtc.hpp +6 −5 Original line number Diff line number Diff line Loading @@ -1054,9 +1054,8 @@ public: // Id of the ScanRecord this fragment scan belongs to Uint32 scanRec; // The maximum number of operations that can be scanned before // returning to TC Uint16 scanFragConcurrency; // The value of fragmentCompleted in the last received SCAN_FRAGCONF Uint8 m_scan_frag_conf_status; inline void startFragTimer(Uint32 timeVal){ scanFragTimer = timeVal; Loading Loading @@ -1193,8 +1192,10 @@ public: // Number of operation records per scanned fragment // Number of operations in first batch // Max number of bytes per batch Uint16 noOprecPerFrag; Uint16 first_batch_size; union { Uint16 first_batch_size_rows; Uint16 batch_size_rows; }; Uint32 batch_byte_size; Uint32 scanRequestInfo; // ScanFrag format Loading ndb/src/kernel/blocks/dbtc/DbtcMain.cpp +52 −28 Original line number Diff line number Diff line Loading @@ -8646,9 +8646,9 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, scanptr.p->scanTableref = tabptr.i; scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion; scanptr.p->scanParallel = scanParallel; scanptr.p->noOprecPerFrag = noOprecPerFrag; scanptr.p->first_batch_size= scanTabReq->first_batch_size; scanptr.p->first_batch_size_rows = scanTabReq->first_batch_size; scanptr.p->batch_byte_size = scanTabReq->batch_byte_size; scanptr.p->batch_size_rows = noOprecPerFrag; Uint32 tmp = 0; const UintR ri = scanTabReq->requestInfo; Loading @@ -8672,7 +8672,6 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, ndbrequire(list.seize(ptr)); ptr.p->scanRec = scanptr.i; ptr.p->scanFragId = 0; ptr.p->scanFragConcurrency = noOprecPerFrag; ptr.p->m_apiPtr = cdata[i]; }//for Loading Loading @@ -9141,6 +9140,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) const ScanFragConf * const conf = (ScanFragConf*)&signal->theData[0]; const Uint32 noCompletedOps = conf->completedOps; const Uint32 status = conf->fragmentCompleted; scanFragptr.i = conf->senderData; c_scan_frag_pool.getPtr(scanFragptr); Loading @@ -9163,11 +9163,9 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE); const Uint32 status = conf->fragmentCompleted; if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){ jam(); if(status == ZFALSE){ if(status == 0){ /** * We have started closing = we sent a close -> ignore this */ Loading @@ -9184,11 +9182,11 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) return; } if(status == ZCLOSED && scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){ if(noCompletedOps == 0 && status != 0 && scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){ /** * Start on next fragment */ ndbrequire(noCompletedOps == 0); scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF; scanFragptr.p->startFragTimer(ctcTimer); Loading Loading @@ -9218,6 +9216,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) scanptr.p->m_queued_count++; } scanFragptr.p->m_scan_frag_conf_status = status; scanFragptr.p->m_ops = noCompletedOps; scanFragptr.p->m_totalLen = total_len; scanFragptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY; Loading Loading @@ -9330,11 +9329,12 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) // Copy op ptrs so I dont overwrite them when sending... memcpy(signal->getDataPtrSend()+25, signal->getDataPtr()+4, 4 * len); ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0]; nextReq->closeFlag = ZFALSE; nextReq->transId1 = apiConnectptr.p->transid[0]; nextReq->transId2 = apiConnectptr.p->transid[1]; nextReq->batch_size_bytes= scanP->batch_byte_size; ScanFragNextReq tmp; tmp.closeFlag = ZFALSE; tmp.transId1 = apiConnectptr.p->transid[0]; tmp.transId2 = apiConnectptr.p->transid[1]; tmp.batch_size_rows = scanP->batch_size_rows; tmp.batch_size_bytes = scanP->batch_byte_size; ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags); ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags); Loading @@ -9344,15 +9344,37 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) c_scan_frag_pool.getPtr(scanFragptr); ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::DELIVERED); scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE; scanFragptr.p->startFragTimer(ctcTimer); scanFragptr.p->m_ops = 0; nextReq->senderData = scanFragptr.i; nextReq->batch_size_rows= scanFragptr.p->scanFragConcurrency; if(scanFragptr.p->m_scan_frag_conf_status) { /** * last scan was complete */ jam(); ndbrequire(scanptr.p->scanNextFragId < scanptr.p->scanNoFrag); scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF; tcConnectptr.i = scanptr.p->scanTcrec; ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord); scanFragptr.p->scanFragId = scanptr.p->scanNextFragId++; signal->theData[0] = tcConnectptr.p->dihConnectptr; signal->theData[1] = scanFragptr.i; signal->theData[2] = scanptr.p->scanTableref; signal->theData[3] = scanFragptr.p->scanFragId; sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB); } else { jam(); scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE; ScanFragNextReq * req = (ScanFragNextReq*)signal->getDataPtrSend(); * req = tmp; req->senderData = scanFragptr.i; sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, ScanFragNextReq::SignalLength, JBB); } delivered.remove(scanFragptr); running.add(scanFragptr); }//for Loading Loading @@ -9551,7 +9573,7 @@ void Dbtc::sendScanFragReq(Signal* signal, req->transId1 = apiConnectptr.p->transid[0]; req->transId2 = apiConnectptr.p->transid[1]; req->clientOpPtr = scanFragP->m_apiPtr; req->batch_size_rows= scanFragP->scanFragConcurrency; req->batch_size_rows= scanP->batch_size_rows; req->batch_size_bytes= scanP->batch_byte_size; sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal, ScanFragReq::SignalLength, JBB); Loading @@ -9574,6 +9596,8 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) { ops += 21; } Uint32 left = scanPtr.p->scanNoFrag - scanPtr.p->scanNextFragId; ScanTabConf * conf = (ScanTabConf*)&signal->theData[0]; conf->apiConnectPtr = apiConnectptr.p->ndbapiConnect; conf->requestInfo = op_count; Loading @@ -9588,17 +9612,18 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) { ScanFragRecPtr curr = ptr; // Remove while iterating... queued.next(ptr); bool done = curr.p->m_scan_frag_conf_status && --left; * ops++ = curr.p->m_apiPtr; * ops++ = curr.i; * ops++ = done ? RNIL : curr.i; * ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops; queued.remove(curr); if(curr.p->m_ops > 0){ if(!done){ delivered.add(curr); curr.p->scanFragState = ScanFragRec::DELIVERED; curr.p->stopFragTimer(); } else { (* --ops) = ScanTabConf::EndOfData; ops++; c_scan_frag_pool.release(curr); curr.p->scanFragState = ScanFragRec::COMPLETED; curr.p->stopFragTimer(); Loading Loading @@ -10424,9 +10449,8 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal) sfp.i, sfp.p->scanFragState, sfp.p->scanFragId); infoEvent(" nodeid=%d, concurr=%d, timer=%d", infoEvent(" nodeid=%d, timer=%d", refToNode(sfp.p->lqhBlockref), sfp.p->scanFragConcurrency, sfp.p->scanFragTimer); } Loading Loading @@ -10504,7 +10528,7 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal) sp.p->scanAiLength, sp.p->scanParallel, sp.p->scanReceivedOperations, sp.p->noOprecPerFrag); sp.p->batch_size_rows); infoEvent(" schv=%d, tab=%d, sproc=%d", sp.p->scanSchemaVersion, sp.p->scanTableref, Loading ndb/src/kernel/blocks/suma/Suma.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -1888,7 +1888,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){ req->requestInfo = 0; req->savePointId = 0; ScanFragReq::setLockMode(req->requestInfo, 0); ScanFragReq::setHoldLockFlag(req->requestInfo, 0); ScanFragReq::setHoldLockFlag(req->requestInfo, 1); ScanFragReq::setKeyinfoFlag(req->requestInfo, 0); ScanFragReq::setAttrLen(req->requestInfo, attrLen); req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo; Loading Loading
ndb/src/kernel/blocks/dblqh/Dblqh.hpp +19 −0 Original line number Diff line number Diff line Loading @@ -2925,4 +2925,23 @@ Dblqh::ScanRecord::check_scan_batch_completed() const (max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes)); } inline void Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index) { if (index == 0) { acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0]; } else { Uint32 attr_buf_index, attr_buf_rec; AttrbufPtr regAttrPtr; jam(); attr_buf_rec= (index + 31) / 32; attr_buf_index= (index - 1) & 31; regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec]; ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf); acc_ptr= (Uint32*)®AttrPtr.p->attrbuf[attr_buf_index]; } } #endif
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +27 −27 Original line number Diff line number Diff line Loading @@ -7058,10 +7058,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal) // Update timer on tcConnectRecord tcConnectptr.p->tcTimer = cLqhTimeOutCount; init_acc_ptr_list(scanptr.p); scanptr.p->m_curr_batch_size_rows = 0; scanptr.p->m_curr_batch_size_bytes= 0; scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT; scanNextLoopLab(signal); }//Dblqh::continueScanNextReqLab() Loading Loading @@ -7260,22 +7257,32 @@ void Dblqh::scanLockReleasedLab(Signal* signal) tcConnectptr.i = scanptr.p->scanTcrec; ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); releaseActiveFrag(signal); if (scanptr.p->scanReleaseCounter == scanptr.p->m_curr_batch_size_rows) { if ((scanptr.p->scanErrorCounter > 0) || (scanptr.p->scanCompletedStatus == ZTRUE)) { jam(); scanptr.p->m_curr_batch_size_rows = 0; scanptr.p->m_curr_batch_size_bytes = 0; closeScanLab(signal); } else if (scanptr.p->check_scan_batch_completed() && scanptr.p->scanLockHold != ZTRUE) { jam(); scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ; sendScanFragConf(signal, ZFALSE); } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) { jam(); closeScanLab(signal); return; } else { jam(); /* We came here after releasing locks after receiving SCAN_NEXTREQ from TC. We only come here when scanHoldLock == ZTRUE * We came here after releasing locks after * receiving SCAN_NEXTREQ from TC. We only come here * when scanHoldLock == ZTRUE */ scanptr.p->m_curr_batch_size_rows = 0; scanptr.p->m_curr_batch_size_bytes = 0; continueScanNextReqLab(signal); }//if } else if (scanptr.p->scanReleaseCounter < scanptr.p->m_curr_batch_size_rows) { Loading Loading @@ -7362,25 +7369,6 @@ Dblqh::init_acc_ptr_list(ScanRecord* scanP) scanP->scan_acc_index = 0; } inline void Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index) { if (index == 0) { acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0]; } else { Uint32 attr_buf_index, attr_buf_rec; AttrbufPtr regAttrPtr; jam(); attr_buf_rec= (index + 31) / 32; attr_buf_index= (index - 1) & 31; regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec]; ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf); acc_ptr= (Uint32*)®AttrPtr.p->attrbuf[attr_buf_index]; } } Uint32 Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP, Uint32 index, Loading Loading @@ -7904,6 +7892,13 @@ void Dblqh::nextScanConfScanLab(Signal* signal) /************************************************************* * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. ************************************************************ */ if (!scanptr.p->scanLockHold) { jam(); closeScanLab(signal); return; } if (scanptr.p->scanCompletedStatus == ZTRUE) { if ((scanptr.p->scanLockHold == ZTRUE) && (scanptr.p->m_curr_batch_size_rows > 0)) { Loading Loading @@ -8404,8 +8399,6 @@ void Dblqh::tupScanCloseConfLab(Signal* signal) ScanFragRef::SignalLength, JBB); } else { jam(); scanptr.p->m_curr_batch_size_rows = 0; scanptr.p->m_curr_batch_size_bytes= 0; sendScanFragConf(signal, ZSCAN_FRAG_CLOSED); }//if finishScanrec(signal); Loading Loading @@ -8809,6 +8802,13 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted) conf->total_len= total_len; sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF, signal, ScanFragConf::SignalLength, JBB); if(!scanptr.p->scanLockHold) { jam(); scanptr.p->m_curr_batch_size_rows = 0; scanptr.p->m_curr_batch_size_bytes= 0; } }//Dblqh::sendScanFragConf() /* ######################################################################### */ Loading
ndb/src/kernel/blocks/dbtc/Dbtc.hpp +6 −5 Original line number Diff line number Diff line Loading @@ -1054,9 +1054,8 @@ public: // Id of the ScanRecord this fragment scan belongs to Uint32 scanRec; // The maximum number of operations that can be scanned before // returning to TC Uint16 scanFragConcurrency; // The value of fragmentCompleted in the last received SCAN_FRAGCONF Uint8 m_scan_frag_conf_status; inline void startFragTimer(Uint32 timeVal){ scanFragTimer = timeVal; Loading Loading @@ -1193,8 +1192,10 @@ public: // Number of operation records per scanned fragment // Number of operations in first batch // Max number of bytes per batch Uint16 noOprecPerFrag; Uint16 first_batch_size; union { Uint16 first_batch_size_rows; Uint16 batch_size_rows; }; Uint32 batch_byte_size; Uint32 scanRequestInfo; // ScanFrag format Loading
ndb/src/kernel/blocks/dbtc/DbtcMain.cpp +52 −28 Original line number Diff line number Diff line Loading @@ -8646,9 +8646,9 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, scanptr.p->scanTableref = tabptr.i; scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion; scanptr.p->scanParallel = scanParallel; scanptr.p->noOprecPerFrag = noOprecPerFrag; scanptr.p->first_batch_size= scanTabReq->first_batch_size; scanptr.p->first_batch_size_rows = scanTabReq->first_batch_size; scanptr.p->batch_byte_size = scanTabReq->batch_byte_size; scanptr.p->batch_size_rows = noOprecPerFrag; Uint32 tmp = 0; const UintR ri = scanTabReq->requestInfo; Loading @@ -8672,7 +8672,6 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, ndbrequire(list.seize(ptr)); ptr.p->scanRec = scanptr.i; ptr.p->scanFragId = 0; ptr.p->scanFragConcurrency = noOprecPerFrag; ptr.p->m_apiPtr = cdata[i]; }//for Loading Loading @@ -9141,6 +9140,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) const ScanFragConf * const conf = (ScanFragConf*)&signal->theData[0]; const Uint32 noCompletedOps = conf->completedOps; const Uint32 status = conf->fragmentCompleted; scanFragptr.i = conf->senderData; c_scan_frag_pool.getPtr(scanFragptr); Loading @@ -9163,11 +9163,9 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE); const Uint32 status = conf->fragmentCompleted; if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){ jam(); if(status == ZFALSE){ if(status == 0){ /** * We have started closing = we sent a close -> ignore this */ Loading @@ -9184,11 +9182,11 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) return; } if(status == ZCLOSED && scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){ if(noCompletedOps == 0 && status != 0 && scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){ /** * Start on next fragment */ ndbrequire(noCompletedOps == 0); scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF; scanFragptr.p->startFragTimer(ctcTimer); Loading Loading @@ -9218,6 +9216,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) scanptr.p->m_queued_count++; } scanFragptr.p->m_scan_frag_conf_status = status; scanFragptr.p->m_ops = noCompletedOps; scanFragptr.p->m_totalLen = total_len; scanFragptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY; Loading Loading @@ -9330,11 +9329,12 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) // Copy op ptrs so I dont overwrite them when sending... memcpy(signal->getDataPtrSend()+25, signal->getDataPtr()+4, 4 * len); ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0]; nextReq->closeFlag = ZFALSE; nextReq->transId1 = apiConnectptr.p->transid[0]; nextReq->transId2 = apiConnectptr.p->transid[1]; nextReq->batch_size_bytes= scanP->batch_byte_size; ScanFragNextReq tmp; tmp.closeFlag = ZFALSE; tmp.transId1 = apiConnectptr.p->transid[0]; tmp.transId2 = apiConnectptr.p->transid[1]; tmp.batch_size_rows = scanP->batch_size_rows; tmp.batch_size_bytes = scanP->batch_byte_size; ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags); ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags); Loading @@ -9344,15 +9344,37 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) c_scan_frag_pool.getPtr(scanFragptr); ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::DELIVERED); scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE; scanFragptr.p->startFragTimer(ctcTimer); scanFragptr.p->m_ops = 0; nextReq->senderData = scanFragptr.i; nextReq->batch_size_rows= scanFragptr.p->scanFragConcurrency; if(scanFragptr.p->m_scan_frag_conf_status) { /** * last scan was complete */ jam(); ndbrequire(scanptr.p->scanNextFragId < scanptr.p->scanNoFrag); scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF; tcConnectptr.i = scanptr.p->scanTcrec; ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord); scanFragptr.p->scanFragId = scanptr.p->scanNextFragId++; signal->theData[0] = tcConnectptr.p->dihConnectptr; signal->theData[1] = scanFragptr.i; signal->theData[2] = scanptr.p->scanTableref; signal->theData[3] = scanFragptr.p->scanFragId; sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB); } else { jam(); scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE; ScanFragNextReq * req = (ScanFragNextReq*)signal->getDataPtrSend(); * req = tmp; req->senderData = scanFragptr.i; sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, ScanFragNextReq::SignalLength, JBB); } delivered.remove(scanFragptr); running.add(scanFragptr); }//for Loading Loading @@ -9551,7 +9573,7 @@ void Dbtc::sendScanFragReq(Signal* signal, req->transId1 = apiConnectptr.p->transid[0]; req->transId2 = apiConnectptr.p->transid[1]; req->clientOpPtr = scanFragP->m_apiPtr; req->batch_size_rows= scanFragP->scanFragConcurrency; req->batch_size_rows= scanP->batch_size_rows; req->batch_size_bytes= scanP->batch_byte_size; sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal, ScanFragReq::SignalLength, JBB); Loading @@ -9574,6 +9596,8 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) { ops += 21; } Uint32 left = scanPtr.p->scanNoFrag - scanPtr.p->scanNextFragId; ScanTabConf * conf = (ScanTabConf*)&signal->theData[0]; conf->apiConnectPtr = apiConnectptr.p->ndbapiConnect; conf->requestInfo = op_count; Loading @@ -9588,17 +9612,18 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) { ScanFragRecPtr curr = ptr; // Remove while iterating... queued.next(ptr); bool done = curr.p->m_scan_frag_conf_status && --left; * ops++ = curr.p->m_apiPtr; * ops++ = curr.i; * ops++ = done ? RNIL : curr.i; * ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops; queued.remove(curr); if(curr.p->m_ops > 0){ if(!done){ delivered.add(curr); curr.p->scanFragState = ScanFragRec::DELIVERED; curr.p->stopFragTimer(); } else { (* --ops) = ScanTabConf::EndOfData; ops++; c_scan_frag_pool.release(curr); curr.p->scanFragState = ScanFragRec::COMPLETED; curr.p->stopFragTimer(); Loading Loading @@ -10424,9 +10449,8 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal) sfp.i, sfp.p->scanFragState, sfp.p->scanFragId); infoEvent(" nodeid=%d, concurr=%d, timer=%d", infoEvent(" nodeid=%d, timer=%d", refToNode(sfp.p->lqhBlockref), sfp.p->scanFragConcurrency, sfp.p->scanFragTimer); } Loading Loading @@ -10504,7 +10528,7 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal) sp.p->scanAiLength, sp.p->scanParallel, sp.p->scanReceivedOperations, sp.p->noOprecPerFrag); sp.p->batch_size_rows); infoEvent(" schv=%d, tab=%d, sproc=%d", sp.p->scanSchemaVersion, sp.p->scanTableref, Loading
ndb/src/kernel/blocks/suma/Suma.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -1888,7 +1888,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){ req->requestInfo = 0; req->savePointId = 0; ScanFragReq::setLockMode(req->requestInfo, 0); ScanFragReq::setHoldLockFlag(req->requestInfo, 0); ScanFragReq::setHoldLockFlag(req->requestInfo, 1); ScanFragReq::setKeyinfoFlag(req->requestInfo, 0); ScanFragReq::setAttrLen(req->requestInfo, attrLen); req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo; Loading