Loading storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +179 −108 Original line number Diff line number Diff line Loading @@ -1394,6 +1394,7 @@ void Dblqh::execTUP_ADD_ATTCONF(Signal* signal) if (! DictTabInfo::isOrderedIndex(addfragptr.p->tableType)) { fragptr.p->m_copy_started_state = Fragrecord::AC_IGNORED; //fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY; fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION; } else Loading Loading @@ -2470,6 +2471,8 @@ void Dblqh::execTUPKEYCONF(Signal* signal) jamEntry(); tcConnectptr.i = tcIndex; ptrCheckGuard(tcConnectptr, ttcConnectrecFileSize, regTcConnectionrec); TcConnectionrec * regTcPtr = tcConnectptr.p; Uint32 activeCreat = regTcPtr->activeCreat; FragrecordPtr regFragptr; regFragptr.i = tcConnectptr.p->fragmentptr; Loading Loading @@ -2497,6 +2500,32 @@ void Dblqh::execTUPKEYCONF(Signal* signal) // Abort was not ready to start until this signal came back. Now we are ready // to start the abort. /* ------------------------------------------------------------------------- */ if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) { jam(); ndbrequire(regTcPtr->m_nr_delete.m_cnt); regTcPtr->m_nr_delete.m_cnt--; if (regTcPtr->m_nr_delete.m_cnt) { jam(); /** * Let operation wait for pending NR operations * even for before writing log...(as it's simpler) */ #ifdef VM_TRACE /** * Only disk table can have pending ops... */ TablerecPtr tablePtr; tablePtr.i = regTcPtr->tableref; ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec); ndbrequire(tablePtr.p->m_disk_table); #endif return; } } abortCommonLab(signal); break; case TcConnectionrec::WAIT_ACC_ABORT: Loading @@ -2523,13 +2552,23 @@ void Dblqh::execTUPKEYREF(Signal* signal) tcConnectptr.i = tupKeyRef->userRef; terrorCode = tupKeyRef->errorCode; ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); TcConnectionrec* regTcPtr = tcConnectptr.p; Uint32 activeCreat = regTcPtr->activeCreat; FragrecordPtr regFragptr; regFragptr.i = tcConnectptr.p->fragmentptr; regFragptr.i = regTcPtr->fragmentptr; c_fragment_pool.getPtr(regFragptr); fragptr = regFragptr; TcConnectionrec* regTcPtr = tcConnectptr.p; if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) { jam(); ndbrequire(regTcPtr->m_nr_delete.m_cnt); regTcPtr->m_nr_delete.m_cnt--; ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP || regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT); } switch (tcConnectptr.p->transactionState) { case TcConnectionrec::WAIT_TUP: jam(); Loading Loading @@ -3767,7 +3806,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal) EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1); jamEntry(); execACC_ABORTCONF(signal); packLqhkeyreqLab(signal); } } Loading Loading @@ -3890,16 +3929,17 @@ Dblqh::handle_nr_copy(Signal* signal, Ptr<TcConnectionrec> regTcPtr) if (TRACENR_FLAG) TRACENR(" performing DELETE key: " << dst[0] << endl); regTcPtr.p->tupkeyData[0] = regTcPtr.p->m_row_id.ref(); if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr) { regTcPtr.p->hashValue = calculateHash(tableId, dst); } else nr_copy_delete_row(signal, regTcPtr, ®TcPtr.p->m_row_id, len); ndbassert(regTcPtr.p->m_nr_delete.m_cnt); regTcPtr.p->m_nr_delete.m_cnt--; // No real op is run if (regTcPtr.p->m_nr_delete.m_cnt) { regTcPtr.p->hashValue = md5_hash((Uint64*)dst, len); jam(); return; } goto run; packLqhkeyreqLab(signal); return; } else if (len == 0 && op == ZDELETE) { Loading Loading @@ -3993,9 +4033,7 @@ Dblqh::handle_nr_copy(Signal* signal, Ptr<TcConnectionrec> regTcPtr) signal->theData[0] = regTcPtr.p->tupConnectrec; EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1); regTcPtr.p->transactionState = TcConnectionrec::WAIT_ACC_ABORT; signal->theData[0] = regTcPtr.i; execACC_ABORTCONF(signal); packLqhkeyreqLab(signal); } int Loading Loading @@ -4149,7 +4187,6 @@ Dblqh::get_nr_op_info(Nr_op_info* op, Uint32 page_id) op->m_gci = tcPtr.p->gci; op->m_tup_frag_ptr_i = fragPtr.p->tupFragptr; ndbrequire(tcPtr.p->transactionState == TcConnectionrec::WAIT_TUP_COMMIT); ndbrequire(tcPtr.p->activeCreat == Fragrecord::AC_NR_COPY); ndbrequire(tcPtr.p->m_nr_delete.m_cnt); Loading Loading @@ -4194,16 +4231,36 @@ Dblqh::nr_delete_complete(Signal* signal, Nr_op_info* op) tcPtr.i = op->m_ptr_i; ptrCheckGuard(tcPtr, ctcConnectrecFileSize, tcConnectionrec); ndbrequire(tcPtr.p->transactionState == TcConnectionrec::WAIT_TUP_COMMIT); ndbrequire(tcPtr.p->activeCreat == Fragrecord::AC_NR_COPY); ndbrequire(tcPtr.p->m_nr_delete.m_cnt); tcPtr.p->m_nr_delete.m_cnt--; if (tcPtr.p->m_nr_delete.m_cnt == 0) { jam(); tcConnectptr = tcPtr; c_fragment_pool.getPtr(fragptr, tcPtr.p->fragmentptr); if (tcPtr.p->abortState != TcConnectionrec::ABORT_IDLE) { jam(); tcPtr.p->activeCreat = Fragrecord::AC_NORMAL; abortCommonLab(signal); } else if (tcPtr.p->operation == ZDELETE && LqhKeyReq::getNrCopyFlag(tcPtr.p->reqinfo)) { /** * This is run directly in handle_nr_copy */ jam(); packLqhkeyreqLab(signal); } else { jam(); rwConcludedLab(signal); } return; } Loading Loading @@ -4319,7 +4376,6 @@ void Dblqh::execACCKEYCONF(Signal* signal) return; }//if // reset the activeCreat since that is only valid in cases where the record was not present. /* ------------------------------------------------------------------------ * IT IS NOW TIME TO CONTACT THE TUPLE MANAGER. THE TUPLE MANAGER NEEDS THE * INFORMATION ON WHICH TABLE AND FRAGMENT, THE LOCAL KEY AND IT NEEDS TO Loading Loading @@ -4536,6 +4592,7 @@ Dblqh::acckeyconf_load_diskpage(Signal* signal, TcConnectionrecPtr regTcPtr, } else { regTcPtr.p->transactionState = TcConnectionrec::WAIT_TUP; TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr(); ref->userRef= regTcPtr.i; ref->errorCode= ~0; Loading Loading @@ -4571,6 +4628,7 @@ Dblqh::acckeyconf_load_diskpage_callback(Signal* signal, } else { regTcPtr->transactionState = TcConnectionrec::WAIT_TUP; TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr(); ref->userRef= callbackData; ref->errorCode= disk_page; Loading @@ -4592,9 +4650,11 @@ Dblqh::acckeyconf_load_diskpage_callback(Signal* signal, * -------------------------------------------------------------------------- */ void Dblqh::tupkeyConfLab(Signal* signal) { /* ---- GET OPERATION TYPE AND CHECK WHAT KIND OF OPERATION IS REQUESTED ---- */ /* ---- GET OPERATION TYPE AND CHECK WHAT KIND OF OPERATION IS REQUESTED --- */ const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0]; TcConnectionrec * const regTcPtr = tcConnectptr.p; Uint32 activeCreat = regTcPtr->activeCreat; if (regTcPtr->simpleRead) { jam(); /* ---------------------------------------------------------------------- Loading @@ -4616,6 +4676,34 @@ void Dblqh::tupkeyConfLab(Signal* signal) }//if regTcPtr->totSendlenAi = tupKeyConf->writeLength; ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen); if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) { jam(); ndbrequire(regTcPtr->m_nr_delete.m_cnt); regTcPtr->m_nr_delete.m_cnt--; if (regTcPtr->m_nr_delete.m_cnt) { jam(); /** * Let operation wait for pending NR operations * even for before writing log...(as it's simpler) */ #ifdef VM_TRACE /** * Only disk table can have pending ops... */ TablerecPtr tablePtr; tablePtr.i = regTcPtr->tableref; ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec); ndbrequire(tablePtr.p->m_disk_table); #endif return; } } rwConcludedLab(signal); return; }//Dblqh::tupkeyConfLab() Loading Loading @@ -6325,27 +6413,19 @@ Dblqh::tupcommit_conf(Signal* signal, /*SEND ANY COMMIT OR COMPLETE MESSAGES TO OTHER NODES. THEY WILL MERELY SEND */ /*THOSE SIGNALS INTERNALLY. */ /* ------------------------------------------------------------------------- */ if (tcPtrP->abortState == TcConnectionrec::ABORT_IDLE) { if (tcPtrP->abortState == TcConnectionrec::ABORT_IDLE) { jam(); if (activeCreat == Fragrecord::AC_NR_COPY && tcPtrP->m_nr_delete.m_cnt > 1) if (activeCreat == Fragrecord::AC_NR_COPY) { jam(); /** * Nr delete waiting for disk delete to complete... */ #ifdef VM_TRACE TablerecPtr tablePtr; tablePtr.i = tcPtrP->tableref; ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec); ndbrequire(tablePtr.p->m_disk_table); #endif tcPtrP->m_nr_delete.m_cnt--; tcPtrP->transactionState = TcConnectionrec::WAIT_TUP_COMMIT; return; ndbrequire(LqhKeyReq::getNrCopyFlag(tcPtrP->reqinfo)); ndbrequire(tcPtrP->m_nr_delete.m_cnt == 0); } packLqhkeyreqLab(signal); } else { } else { ndbrequire(tcPtrP->abortState != TcConnectionrec::NEW_FROM_TC); jam(); sendLqhTransconf(signal, LqhTransConf::Committed); Loading Loading @@ -6549,7 +6629,7 @@ void Dblqh::execABORT(Signal* signal) }//if TcConnectionrec * const regTcPtr = tcConnectptr.p; Uint32 activeCreat = regTcPtr->activeCreat; if (ERROR_INSERTED(5100)) { SET_ERROR_INSERT_VALUE(5101); Loading @@ -6574,10 +6654,10 @@ void Dblqh::execABORT(Signal* signal) sendSignal(TLqhRef, GSN_ABORT, signal, 4, JBB); }//if regTcPtr->abortState = TcConnectionrec::ABORT_FROM_TC; regTcPtr->activeCreat = Fragrecord::AC_NORMAL; const Uint32 commitAckMarker = regTcPtr->commitAckMarker; if(commitAckMarker != RNIL){ if(commitAckMarker != RNIL) { jam(); #ifdef MARKER_TRACE { Loading Loading @@ -6627,6 +6707,7 @@ void Dblqh::execABORTREQ(Signal* signal) return; }//if TcConnectionrec * const regTcPtr = tcConnectptr.p; Uint32 activeCreat = regTcPtr->activeCreat; if (regTcPtr->transactionState != TcConnectionrec::PREPARED) { warningReport(signal, 10); return; Loading @@ -6634,7 +6715,7 @@ void Dblqh::execABORTREQ(Signal* signal) regTcPtr->reqBlockref = reqBlockref; regTcPtr->reqRef = reqPtr; regTcPtr->abortState = TcConnectionrec::REQ_FROM_TC; regTcPtr->activeCreat = Fragrecord::AC_NORMAL; abortCommonLab(signal); return; }//Dblqh::execABORTREQ() Loading Loading @@ -6704,22 +6785,7 @@ void Dblqh::execACCKEYREF(Signal* signal) } if (tcPtr->activeCreat == Fragrecord::AC_NR_COPY) { jam(); Uint32 op = tcPtr->operation; switch(errCode){ case ZNO_TUPLE_FOUND: ndbrequire(op == ZDELETE); break; break; default: ndbrequire(false); } tcPtr->activeCreat = Fragrecord::AC_IGNORED; } else { ndbrequire(tcPtr->activeCreat == Fragrecord::AC_NORMAL); ndbrequire(!LqhKeyReq::getNrCopyFlag(tcPtr->reqinfo)); /** Loading @@ -6739,7 +6805,6 @@ void Dblqh::execACCKEYREF(Signal* signal) (tcPtr->seqNoReplica == 0 || errCode != ZTUPLE_ALREADY_EXIST || (tcPtr->operation == ZREAD && (tcPtr->dirtyOp || tcPtr->opSimple))); } tcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH; abortCommonLab(signal); Loading @@ -6753,7 +6818,6 @@ void Dblqh::localAbortStateHandlerLab(Signal* signal) jam(); return; }//if regTcPtr->activeCreat = Fragrecord::AC_NORMAL; regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH; regTcPtr->errorCode = terrorCode; abortStateHandlerLab(signal); Loading Loading @@ -6929,11 +6993,6 @@ void Dblqh::abortErrorLab(Signal* signal) regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH; regTcPtr->errorCode = terrorCode; }//if /* ----------------------------------------------------------------------- * ACTIVE CREATION IS RESET FOR ALL ERRORS WHICH SHOULD BE HANDLED * WITH NORMAL ABORT HANDLING. * ----------------------------------------------------------------------- */ regTcPtr->activeCreat = Fragrecord::AC_NORMAL; abortCommonLab(signal); return; }//Dblqh::abortErrorLab() Loading @@ -6942,8 +7001,9 @@ void Dblqh::abortCommonLab(Signal* signal) { TcConnectionrec * const regTcPtr = tcConnectptr.p; const Uint32 commitAckMarker = regTcPtr->commitAckMarker; if(regTcPtr->activeCreat != Fragrecord::AC_IGNORED && commitAckMarker != RNIL){ const Uint32 activeCreat = regTcPtr->activeCreat; if (commitAckMarker != RNIL) { /** * There is no NR ongoing and we have a marker */ Loading @@ -6959,6 +7019,29 @@ void Dblqh::abortCommonLab(Signal* signal) regTcPtr->commitAckMarker = RNIL; } if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) { jam(); if (regTcPtr->m_nr_delete.m_cnt) { jam(); /** * Let operation wait for pending NR operations */ #ifdef VM_TRACE /** * Only disk table can have pending ops... */ TablerecPtr tablePtr; tablePtr.i = regTcPtr->tableref; ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec); ndbrequire(tablePtr.p->m_disk_table); #endif return; } } fragptr.i = regTcPtr->fragmentptr; if (fragptr.i != RNIL) { jam(); Loading Loading @@ -7034,25 +7117,6 @@ void Dblqh::execACC_ABORTCONF(Signal* signal) ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); TcConnectionrec * const regTcPtr = tcConnectptr.p; ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT); if (regTcPtr->activeCreat == Fragrecord::AC_IGNORED) { /* ---------------------------------------------------------------------- * A NORMAL EVENT DURING CREATION OF A FRAGMENT. WE NOW NEED TO CONTINUE * WITH NORMAL COMMIT PROCESSING. * --------------------------------------------------------------------- */ if (regTcPtr->currTupAiLen == regTcPtr->totReclenAi) { jam(); regTcPtr->abortState = TcConnectionrec::ABORT_IDLE; fragptr.i = regTcPtr->fragmentptr; c_fragment_pool.getPtr(fragptr); rwConcludedLab(signal); return; } else { ndbrequire(regTcPtr->currTupAiLen < regTcPtr->totReclenAi); jam(); regTcPtr->transactionState = TcConnectionrec::WAIT_AI_AFTER_ABORT; return; }//if }//if continueAbortLab(signal); return; }//Dblqh::execACC_ABORTCONF() Loading Loading @@ -9450,7 +9514,7 @@ void Dblqh::initScanTc(const ScanFragReq* req, tcConnectptr.p->m_offset_current_keybuf = 0; tcConnectptr.p->m_scan_curr_range_no = 0; tcConnectptr.p->m_dealloc = 0; tcConnectptr.p->activeCreat = Fragrecord::AC_NORMAL; TablerecPtr tTablePtr; tTablePtr.i = tabptr.p->primaryTableId; ptrCheckGuard(tTablePtr, ctabrecFileSize, tablerec); Loading Loading @@ -9929,16 +9993,21 @@ void Dblqh::continueFirstCopyAfterBlockedLab(Signal* signal) */ fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY; if (0) scanptr.i = tcConnectptr.p->tcScanRec; c_scanRecordPool.getPtr(scanptr); if (false && fragptr.p->tabRef > 4) { ndbout_c("STOPPING COPY (%d -> %d %d %d)", scanptr.p->scanBlockref, ndbout_c("STOPPING COPY X = [ %d %d %d %d ]", refToBlock(scanptr.p->scanBlockref), scanptr.p->scanAccPtr, RNIL, NextScanReq::ZSCAN_NEXT); /** * RESTART: > DUMP 7020 332 X */ return; } scanptr.i = tcConnectptr.p->tcScanRec; c_scanRecordPool.getPtr(scanptr); signal->theData[0] = scanptr.p->scanAccPtr; signal->theData[1] = RNIL; signal->theData[2] = NextScanReq::ZSCAN_NEXT; Loading Loading @@ -18351,6 +18420,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal) << " tcBlockref = " << hex << tcRec.p->tcBlockref << " reqBlockref = " << hex << tcRec.p->reqBlockref << " primKeyLen = " << tcRec.p->primKeyLen << " nrcopyflag = " << LqhKeyReq::getNrCopyFlag(tcRec.p->reqinfo) << endl; ndbout << " nextReplica = " << tcRec.p->nextReplica << " tcBlockref = " << hex << tcRec.p->tcBlockref Loading Loading @@ -18421,6 +18491,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal) << endl; ndbout << " tupkeyData2 = " << tcRec.p->tupkeyData[2] << " tupkeyData3 = " << tcRec.p->tupkeyData[3] << " m_nr_delete.m_cnt = " << tcRec.p->m_nr_delete.m_cnt << endl; switch (tcRec.p->transactionState) { storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp +37 −0 Original line number Diff line number Diff line Loading @@ -484,6 +484,14 @@ Dbtup::load_diskpage(Signal* signal, req.m_callback.m_callbackFunction= safe_cast(&Dbtup::disk_page_load_callback); #ifdef ERROR_INSERTED if (ERROR_INSERTED(4022)) { flags |= Page_cache_client::DELAY_REQ; req.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)3000; } #endif if((res= m_pgman.get_page(signal, req, flags)) > 0) { //ndbout_c("in cache"); Loading Loading @@ -3119,6 +3127,35 @@ Dbtup::nr_delete(Signal* signal, Uint32 senderData, preq.m_callback.m_callbackFunction = safe_cast(&Dbtup::nr_delete_page_callback); int flags = Page_cache_client::COMMIT_REQ; #ifdef ERROR_INSERT if (ERROR_INSERTED(4023) || ERROR_INSERTED(4024)) { int rnd = rand() % 100; int slp = 0; if (ERROR_INSERTED(4024)) { slp = 3000; } else if (rnd > 90) { slp = 3000; } else if (rnd > 70) { slp = 100; } ndbout_c("rnd: %d slp: %d", rnd, slp); if (slp) { flags |= Page_cache_client::DELAY_REQ; preq.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)slp; } } #endif res = m_pgman.get_page(signal, preq, flags); if (res == 0) { Loading storage/ndb/src/kernel/blocks/pgman.cpp +48 −5 Original line number Diff line number Diff line Loading @@ -944,12 +944,16 @@ Pgman::process_callback(Signal* signal) int max_count = 1; Page_sublist& pl_callback = *m_page_sublist[Page_entry::SL_CALLBACK]; while (! pl_callback.isEmpty() && --max_count >= 0) { jam(); Ptr<Page_entry> ptr; pl_callback.first(ptr); if (! process_callback(signal, ptr)) while (! ptr.isNull() && --max_count >= 0) { jam(); Ptr<Page_entry> curr = ptr; pl_callback.next(ptr); if (! process_callback(signal, curr)) { jam(); break; Loading Loading @@ -987,6 +991,18 @@ Pgman::process_callback(Signal* signal, Ptr<Page_entry> ptr) #ifdef VM_TRACE debugOut << "PGMAN: " << req_ptr << " : process_callback" << endl; #endif #ifdef ERROR_INSERT if (req_ptr.p->m_flags & Page_request::DELAY_REQ) { Uint64 now = NdbTick_CurrentMillisecond(); if (now < req_ptr.p->m_delay_until_time) { break; } } #endif b = globalData.getBlock(req_ptr.p->m_block); callback = req_ptr.p->m_callback; Loading Loading @@ -1314,6 +1330,24 @@ Pgman::fsreadconf(Signal* signal, Ptr<Page_entry> ptr) state |= Page_entry::MAPPED; set_page_state(ptr, state); { /** * Update lsn record on page * as it can be modified/flushed wo/ update_lsn has been called * (e.g. prealloc) and it then would get lsn 0, which is bad * when running undo and following SR */ Ptr<GlobalPage> pagePtr; m_global_page_pool.getPtr(pagePtr, ptr.p->m_real_page_i); File_formats::Datafile::Data_page* page = (File_formats::Datafile::Data_page*)pagePtr.p; Uint64 lsn = 0; lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32; lsn += page->m_page_header.m_page_lsn_lo; ptr.p->m_lsn = lsn; } ndbrequire(m_stats.m_current_io_waits > 0); m_stats.m_current_io_waits--; Loading Loading @@ -1576,6 +1610,12 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req) bool only_request = ptr.p->m_requests.isEmpty(); if (req_flags & Page_request::DELAY_REQ) { jam(); only_request = false; } if (only_request && state & Page_entry::MAPPED) { Loading Loading @@ -1623,6 +1663,9 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req) req_ptr.p->m_block = page_req.m_block; req_ptr.p->m_flags = page_req.m_flags; req_ptr.p->m_callback = page_req.m_callback; #ifdef ERROR_INSERT req_ptr.p->m_delay_until_time = page_req.m_delay_until_time; #endif state |= Page_entry::REQUEST; if (only_request && req_flags & Page_request::EMPTY_PAGE) Loading storage/ndb/src/kernel/blocks/pgman.hpp +18 −2 Original line number Diff line number Diff line Loading @@ -256,12 +256,18 @@ private: ,DIRTY_REQ = 0x0200 // make page dirty wo/ update_lsn ,UNLOCK_PAGE = 0x0400 ,CORR_REQ = 0x0800 // correlated request (no LIRS update) #ifdef ERROR_INSERT ,DELAY_REQ = 0x1000 // Force request to be delayed #endif }; Uint16 m_block; Uint16 m_flags; SimulatedBlock::Callback m_callback; #ifdef ERROR_INSERT Uint64 m_delay_until_time; #endif Uint32 nextList; Uint32 m_magic; }; Loading Loading @@ -508,6 +514,10 @@ public: struct Request { Local_key m_page; SimulatedBlock::Callback m_callback; #ifdef ERROR_INSERT Uint64 m_delay_until_time; #endif }; Ptr<GlobalPage> m_ptr; // TODO remove Loading @@ -520,6 +530,9 @@ public: ,DIRTY_REQ = Pgman::Page_request::DIRTY_REQ ,UNLOCK_PAGE = Pgman::Page_request::UNLOCK_PAGE ,CORR_REQ = Pgman::Page_request::CORR_REQ #ifdef ERROR_INSERT ,DELAY_REQ = Pgman::Page_request::DELAY_REQ #endif }; /** Loading Loading @@ -588,6 +601,9 @@ Page_cache_client::get_page(Signal* signal, Request& req, Uint32 flags) page_req.m_block = m_block; page_req.m_flags = flags; page_req.m_callback = req.m_callback; #ifdef ERROR_INSERT page_req.m_delay_until_time = req.m_delay_until_time; #endif int i = m_pgman->get_page(signal, entry_ptr, page_req); if (i > 0) Loading Loading
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +179 −108 Original line number Diff line number Diff line Loading @@ -1394,6 +1394,7 @@ void Dblqh::execTUP_ADD_ATTCONF(Signal* signal) if (! DictTabInfo::isOrderedIndex(addfragptr.p->tableType)) { fragptr.p->m_copy_started_state = Fragrecord::AC_IGNORED; //fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY; fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION; } else Loading Loading @@ -2470,6 +2471,8 @@ void Dblqh::execTUPKEYCONF(Signal* signal) jamEntry(); tcConnectptr.i = tcIndex; ptrCheckGuard(tcConnectptr, ttcConnectrecFileSize, regTcConnectionrec); TcConnectionrec * regTcPtr = tcConnectptr.p; Uint32 activeCreat = regTcPtr->activeCreat; FragrecordPtr regFragptr; regFragptr.i = tcConnectptr.p->fragmentptr; Loading Loading @@ -2497,6 +2500,32 @@ void Dblqh::execTUPKEYCONF(Signal* signal) // Abort was not ready to start until this signal came back. Now we are ready // to start the abort. /* ------------------------------------------------------------------------- */ if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) { jam(); ndbrequire(regTcPtr->m_nr_delete.m_cnt); regTcPtr->m_nr_delete.m_cnt--; if (regTcPtr->m_nr_delete.m_cnt) { jam(); /** * Let operation wait for pending NR operations * even for before writing log...(as it's simpler) */ #ifdef VM_TRACE /** * Only disk table can have pending ops... */ TablerecPtr tablePtr; tablePtr.i = regTcPtr->tableref; ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec); ndbrequire(tablePtr.p->m_disk_table); #endif return; } } abortCommonLab(signal); break; case TcConnectionrec::WAIT_ACC_ABORT: Loading @@ -2523,13 +2552,23 @@ void Dblqh::execTUPKEYREF(Signal* signal) tcConnectptr.i = tupKeyRef->userRef; terrorCode = tupKeyRef->errorCode; ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); TcConnectionrec* regTcPtr = tcConnectptr.p; Uint32 activeCreat = regTcPtr->activeCreat; FragrecordPtr regFragptr; regFragptr.i = tcConnectptr.p->fragmentptr; regFragptr.i = regTcPtr->fragmentptr; c_fragment_pool.getPtr(regFragptr); fragptr = regFragptr; TcConnectionrec* regTcPtr = tcConnectptr.p; if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) { jam(); ndbrequire(regTcPtr->m_nr_delete.m_cnt); regTcPtr->m_nr_delete.m_cnt--; ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP || regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT); } switch (tcConnectptr.p->transactionState) { case TcConnectionrec::WAIT_TUP: jam(); Loading Loading @@ -3767,7 +3806,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal) EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1); jamEntry(); execACC_ABORTCONF(signal); packLqhkeyreqLab(signal); } } Loading Loading @@ -3890,16 +3929,17 @@ Dblqh::handle_nr_copy(Signal* signal, Ptr<TcConnectionrec> regTcPtr) if (TRACENR_FLAG) TRACENR(" performing DELETE key: " << dst[0] << endl); regTcPtr.p->tupkeyData[0] = regTcPtr.p->m_row_id.ref(); if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr) { regTcPtr.p->hashValue = calculateHash(tableId, dst); } else nr_copy_delete_row(signal, regTcPtr, ®TcPtr.p->m_row_id, len); ndbassert(regTcPtr.p->m_nr_delete.m_cnt); regTcPtr.p->m_nr_delete.m_cnt--; // No real op is run if (regTcPtr.p->m_nr_delete.m_cnt) { regTcPtr.p->hashValue = md5_hash((Uint64*)dst, len); jam(); return; } goto run; packLqhkeyreqLab(signal); return; } else if (len == 0 && op == ZDELETE) { Loading Loading @@ -3993,9 +4033,7 @@ Dblqh::handle_nr_copy(Signal* signal, Ptr<TcConnectionrec> regTcPtr) signal->theData[0] = regTcPtr.p->tupConnectrec; EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1); regTcPtr.p->transactionState = TcConnectionrec::WAIT_ACC_ABORT; signal->theData[0] = regTcPtr.i; execACC_ABORTCONF(signal); packLqhkeyreqLab(signal); } int Loading Loading @@ -4149,7 +4187,6 @@ Dblqh::get_nr_op_info(Nr_op_info* op, Uint32 page_id) op->m_gci = tcPtr.p->gci; op->m_tup_frag_ptr_i = fragPtr.p->tupFragptr; ndbrequire(tcPtr.p->transactionState == TcConnectionrec::WAIT_TUP_COMMIT); ndbrequire(tcPtr.p->activeCreat == Fragrecord::AC_NR_COPY); ndbrequire(tcPtr.p->m_nr_delete.m_cnt); Loading Loading @@ -4194,16 +4231,36 @@ Dblqh::nr_delete_complete(Signal* signal, Nr_op_info* op) tcPtr.i = op->m_ptr_i; ptrCheckGuard(tcPtr, ctcConnectrecFileSize, tcConnectionrec); ndbrequire(tcPtr.p->transactionState == TcConnectionrec::WAIT_TUP_COMMIT); ndbrequire(tcPtr.p->activeCreat == Fragrecord::AC_NR_COPY); ndbrequire(tcPtr.p->m_nr_delete.m_cnt); tcPtr.p->m_nr_delete.m_cnt--; if (tcPtr.p->m_nr_delete.m_cnt == 0) { jam(); tcConnectptr = tcPtr; c_fragment_pool.getPtr(fragptr, tcPtr.p->fragmentptr); if (tcPtr.p->abortState != TcConnectionrec::ABORT_IDLE) { jam(); tcPtr.p->activeCreat = Fragrecord::AC_NORMAL; abortCommonLab(signal); } else if (tcPtr.p->operation == ZDELETE && LqhKeyReq::getNrCopyFlag(tcPtr.p->reqinfo)) { /** * This is run directly in handle_nr_copy */ jam(); packLqhkeyreqLab(signal); } else { jam(); rwConcludedLab(signal); } return; } Loading Loading @@ -4319,7 +4376,6 @@ void Dblqh::execACCKEYCONF(Signal* signal) return; }//if // reset the activeCreat since that is only valid in cases where the record was not present. /* ------------------------------------------------------------------------ * IT IS NOW TIME TO CONTACT THE TUPLE MANAGER. THE TUPLE MANAGER NEEDS THE * INFORMATION ON WHICH TABLE AND FRAGMENT, THE LOCAL KEY AND IT NEEDS TO Loading Loading @@ -4536,6 +4592,7 @@ Dblqh::acckeyconf_load_diskpage(Signal* signal, TcConnectionrecPtr regTcPtr, } else { regTcPtr.p->transactionState = TcConnectionrec::WAIT_TUP; TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr(); ref->userRef= regTcPtr.i; ref->errorCode= ~0; Loading Loading @@ -4571,6 +4628,7 @@ Dblqh::acckeyconf_load_diskpage_callback(Signal* signal, } else { regTcPtr->transactionState = TcConnectionrec::WAIT_TUP; TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr(); ref->userRef= callbackData; ref->errorCode= disk_page; Loading @@ -4592,9 +4650,11 @@ Dblqh::acckeyconf_load_diskpage_callback(Signal* signal, * -------------------------------------------------------------------------- */ void Dblqh::tupkeyConfLab(Signal* signal) { /* ---- GET OPERATION TYPE AND CHECK WHAT KIND OF OPERATION IS REQUESTED ---- */ /* ---- GET OPERATION TYPE AND CHECK WHAT KIND OF OPERATION IS REQUESTED --- */ const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0]; TcConnectionrec * const regTcPtr = tcConnectptr.p; Uint32 activeCreat = regTcPtr->activeCreat; if (regTcPtr->simpleRead) { jam(); /* ---------------------------------------------------------------------- Loading @@ -4616,6 +4676,34 @@ void Dblqh::tupkeyConfLab(Signal* signal) }//if regTcPtr->totSendlenAi = tupKeyConf->writeLength; ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen); if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) { jam(); ndbrequire(regTcPtr->m_nr_delete.m_cnt); regTcPtr->m_nr_delete.m_cnt--; if (regTcPtr->m_nr_delete.m_cnt) { jam(); /** * Let operation wait for pending NR operations * even for before writing log...(as it's simpler) */ #ifdef VM_TRACE /** * Only disk table can have pending ops... */ TablerecPtr tablePtr; tablePtr.i = regTcPtr->tableref; ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec); ndbrequire(tablePtr.p->m_disk_table); #endif return; } } rwConcludedLab(signal); return; }//Dblqh::tupkeyConfLab() Loading Loading @@ -6325,27 +6413,19 @@ Dblqh::tupcommit_conf(Signal* signal, /*SEND ANY COMMIT OR COMPLETE MESSAGES TO OTHER NODES. THEY WILL MERELY SEND */ /*THOSE SIGNALS INTERNALLY. */ /* ------------------------------------------------------------------------- */ if (tcPtrP->abortState == TcConnectionrec::ABORT_IDLE) { if (tcPtrP->abortState == TcConnectionrec::ABORT_IDLE) { jam(); if (activeCreat == Fragrecord::AC_NR_COPY && tcPtrP->m_nr_delete.m_cnt > 1) if (activeCreat == Fragrecord::AC_NR_COPY) { jam(); /** * Nr delete waiting for disk delete to complete... */ #ifdef VM_TRACE TablerecPtr tablePtr; tablePtr.i = tcPtrP->tableref; ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec); ndbrequire(tablePtr.p->m_disk_table); #endif tcPtrP->m_nr_delete.m_cnt--; tcPtrP->transactionState = TcConnectionrec::WAIT_TUP_COMMIT; return; ndbrequire(LqhKeyReq::getNrCopyFlag(tcPtrP->reqinfo)); ndbrequire(tcPtrP->m_nr_delete.m_cnt == 0); } packLqhkeyreqLab(signal); } else { } else { ndbrequire(tcPtrP->abortState != TcConnectionrec::NEW_FROM_TC); jam(); sendLqhTransconf(signal, LqhTransConf::Committed); Loading Loading @@ -6549,7 +6629,7 @@ void Dblqh::execABORT(Signal* signal) }//if TcConnectionrec * const regTcPtr = tcConnectptr.p; Uint32 activeCreat = regTcPtr->activeCreat; if (ERROR_INSERTED(5100)) { SET_ERROR_INSERT_VALUE(5101); Loading @@ -6574,10 +6654,10 @@ void Dblqh::execABORT(Signal* signal) sendSignal(TLqhRef, GSN_ABORT, signal, 4, JBB); }//if regTcPtr->abortState = TcConnectionrec::ABORT_FROM_TC; regTcPtr->activeCreat = Fragrecord::AC_NORMAL; const Uint32 commitAckMarker = regTcPtr->commitAckMarker; if(commitAckMarker != RNIL){ if(commitAckMarker != RNIL) { jam(); #ifdef MARKER_TRACE { Loading Loading @@ -6627,6 +6707,7 @@ void Dblqh::execABORTREQ(Signal* signal) return; }//if TcConnectionrec * const regTcPtr = tcConnectptr.p; Uint32 activeCreat = regTcPtr->activeCreat; if (regTcPtr->transactionState != TcConnectionrec::PREPARED) { warningReport(signal, 10); return; Loading @@ -6634,7 +6715,7 @@ void Dblqh::execABORTREQ(Signal* signal) regTcPtr->reqBlockref = reqBlockref; regTcPtr->reqRef = reqPtr; regTcPtr->abortState = TcConnectionrec::REQ_FROM_TC; regTcPtr->activeCreat = Fragrecord::AC_NORMAL; abortCommonLab(signal); return; }//Dblqh::execABORTREQ() Loading Loading @@ -6704,22 +6785,7 @@ void Dblqh::execACCKEYREF(Signal* signal) } if (tcPtr->activeCreat == Fragrecord::AC_NR_COPY) { jam(); Uint32 op = tcPtr->operation; switch(errCode){ case ZNO_TUPLE_FOUND: ndbrequire(op == ZDELETE); break; break; default: ndbrequire(false); } tcPtr->activeCreat = Fragrecord::AC_IGNORED; } else { ndbrequire(tcPtr->activeCreat == Fragrecord::AC_NORMAL); ndbrequire(!LqhKeyReq::getNrCopyFlag(tcPtr->reqinfo)); /** Loading @@ -6739,7 +6805,6 @@ void Dblqh::execACCKEYREF(Signal* signal) (tcPtr->seqNoReplica == 0 || errCode != ZTUPLE_ALREADY_EXIST || (tcPtr->operation == ZREAD && (tcPtr->dirtyOp || tcPtr->opSimple))); } tcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH; abortCommonLab(signal); Loading @@ -6753,7 +6818,6 @@ void Dblqh::localAbortStateHandlerLab(Signal* signal) jam(); return; }//if regTcPtr->activeCreat = Fragrecord::AC_NORMAL; regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH; regTcPtr->errorCode = terrorCode; abortStateHandlerLab(signal); Loading Loading @@ -6929,11 +6993,6 @@ void Dblqh::abortErrorLab(Signal* signal) regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH; regTcPtr->errorCode = terrorCode; }//if /* ----------------------------------------------------------------------- * ACTIVE CREATION IS RESET FOR ALL ERRORS WHICH SHOULD BE HANDLED * WITH NORMAL ABORT HANDLING. * ----------------------------------------------------------------------- */ regTcPtr->activeCreat = Fragrecord::AC_NORMAL; abortCommonLab(signal); return; }//Dblqh::abortErrorLab() Loading @@ -6942,8 +7001,9 @@ void Dblqh::abortCommonLab(Signal* signal) { TcConnectionrec * const regTcPtr = tcConnectptr.p; const Uint32 commitAckMarker = regTcPtr->commitAckMarker; if(regTcPtr->activeCreat != Fragrecord::AC_IGNORED && commitAckMarker != RNIL){ const Uint32 activeCreat = regTcPtr->activeCreat; if (commitAckMarker != RNIL) { /** * There is no NR ongoing and we have a marker */ Loading @@ -6959,6 +7019,29 @@ void Dblqh::abortCommonLab(Signal* signal) regTcPtr->commitAckMarker = RNIL; } if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) { jam(); if (regTcPtr->m_nr_delete.m_cnt) { jam(); /** * Let operation wait for pending NR operations */ #ifdef VM_TRACE /** * Only disk table can have pending ops... */ TablerecPtr tablePtr; tablePtr.i = regTcPtr->tableref; ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec); ndbrequire(tablePtr.p->m_disk_table); #endif return; } } fragptr.i = regTcPtr->fragmentptr; if (fragptr.i != RNIL) { jam(); Loading Loading @@ -7034,25 +7117,6 @@ void Dblqh::execACC_ABORTCONF(Signal* signal) ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); TcConnectionrec * const regTcPtr = tcConnectptr.p; ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT); if (regTcPtr->activeCreat == Fragrecord::AC_IGNORED) { /* ---------------------------------------------------------------------- * A NORMAL EVENT DURING CREATION OF A FRAGMENT. WE NOW NEED TO CONTINUE * WITH NORMAL COMMIT PROCESSING. * --------------------------------------------------------------------- */ if (regTcPtr->currTupAiLen == regTcPtr->totReclenAi) { jam(); regTcPtr->abortState = TcConnectionrec::ABORT_IDLE; fragptr.i = regTcPtr->fragmentptr; c_fragment_pool.getPtr(fragptr); rwConcludedLab(signal); return; } else { ndbrequire(regTcPtr->currTupAiLen < regTcPtr->totReclenAi); jam(); regTcPtr->transactionState = TcConnectionrec::WAIT_AI_AFTER_ABORT; return; }//if }//if continueAbortLab(signal); return; }//Dblqh::execACC_ABORTCONF() Loading Loading @@ -9450,7 +9514,7 @@ void Dblqh::initScanTc(const ScanFragReq* req, tcConnectptr.p->m_offset_current_keybuf = 0; tcConnectptr.p->m_scan_curr_range_no = 0; tcConnectptr.p->m_dealloc = 0; tcConnectptr.p->activeCreat = Fragrecord::AC_NORMAL; TablerecPtr tTablePtr; tTablePtr.i = tabptr.p->primaryTableId; ptrCheckGuard(tTablePtr, ctabrecFileSize, tablerec); Loading Loading @@ -9929,16 +9993,21 @@ void Dblqh::continueFirstCopyAfterBlockedLab(Signal* signal) */ fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY; if (0) scanptr.i = tcConnectptr.p->tcScanRec; c_scanRecordPool.getPtr(scanptr); if (false && fragptr.p->tabRef > 4) { ndbout_c("STOPPING COPY (%d -> %d %d %d)", scanptr.p->scanBlockref, ndbout_c("STOPPING COPY X = [ %d %d %d %d ]", refToBlock(scanptr.p->scanBlockref), scanptr.p->scanAccPtr, RNIL, NextScanReq::ZSCAN_NEXT); /** * RESTART: > DUMP 7020 332 X */ return; } scanptr.i = tcConnectptr.p->tcScanRec; c_scanRecordPool.getPtr(scanptr); signal->theData[0] = scanptr.p->scanAccPtr; signal->theData[1] = RNIL; signal->theData[2] = NextScanReq::ZSCAN_NEXT; Loading Loading @@ -18351,6 +18420,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal) << " tcBlockref = " << hex << tcRec.p->tcBlockref << " reqBlockref = " << hex << tcRec.p->reqBlockref << " primKeyLen = " << tcRec.p->primKeyLen << " nrcopyflag = " << LqhKeyReq::getNrCopyFlag(tcRec.p->reqinfo) << endl; ndbout << " nextReplica = " << tcRec.p->nextReplica << " tcBlockref = " << hex << tcRec.p->tcBlockref Loading Loading @@ -18421,6 +18491,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal) << endl; ndbout << " tupkeyData2 = " << tcRec.p->tupkeyData[2] << " tupkeyData3 = " << tcRec.p->tupkeyData[3] << " m_nr_delete.m_cnt = " << tcRec.p->m_nr_delete.m_cnt << endl; switch (tcRec.p->transactionState) {
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp +37 −0 Original line number Diff line number Diff line Loading @@ -484,6 +484,14 @@ Dbtup::load_diskpage(Signal* signal, req.m_callback.m_callbackFunction= safe_cast(&Dbtup::disk_page_load_callback); #ifdef ERROR_INSERTED if (ERROR_INSERTED(4022)) { flags |= Page_cache_client::DELAY_REQ; req.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)3000; } #endif if((res= m_pgman.get_page(signal, req, flags)) > 0) { //ndbout_c("in cache"); Loading Loading @@ -3119,6 +3127,35 @@ Dbtup::nr_delete(Signal* signal, Uint32 senderData, preq.m_callback.m_callbackFunction = safe_cast(&Dbtup::nr_delete_page_callback); int flags = Page_cache_client::COMMIT_REQ; #ifdef ERROR_INSERT if (ERROR_INSERTED(4023) || ERROR_INSERTED(4024)) { int rnd = rand() % 100; int slp = 0; if (ERROR_INSERTED(4024)) { slp = 3000; } else if (rnd > 90) { slp = 3000; } else if (rnd > 70) { slp = 100; } ndbout_c("rnd: %d slp: %d", rnd, slp); if (slp) { flags |= Page_cache_client::DELAY_REQ; preq.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)slp; } } #endif res = m_pgman.get_page(signal, preq, flags); if (res == 0) { Loading
storage/ndb/src/kernel/blocks/pgman.cpp +48 −5 Original line number Diff line number Diff line Loading @@ -944,12 +944,16 @@ Pgman::process_callback(Signal* signal) int max_count = 1; Page_sublist& pl_callback = *m_page_sublist[Page_entry::SL_CALLBACK]; while (! pl_callback.isEmpty() && --max_count >= 0) { jam(); Ptr<Page_entry> ptr; pl_callback.first(ptr); if (! process_callback(signal, ptr)) while (! ptr.isNull() && --max_count >= 0) { jam(); Ptr<Page_entry> curr = ptr; pl_callback.next(ptr); if (! process_callback(signal, curr)) { jam(); break; Loading Loading @@ -987,6 +991,18 @@ Pgman::process_callback(Signal* signal, Ptr<Page_entry> ptr) #ifdef VM_TRACE debugOut << "PGMAN: " << req_ptr << " : process_callback" << endl; #endif #ifdef ERROR_INSERT if (req_ptr.p->m_flags & Page_request::DELAY_REQ) { Uint64 now = NdbTick_CurrentMillisecond(); if (now < req_ptr.p->m_delay_until_time) { break; } } #endif b = globalData.getBlock(req_ptr.p->m_block); callback = req_ptr.p->m_callback; Loading Loading @@ -1314,6 +1330,24 @@ Pgman::fsreadconf(Signal* signal, Ptr<Page_entry> ptr) state |= Page_entry::MAPPED; set_page_state(ptr, state); { /** * Update lsn record on page * as it can be modified/flushed wo/ update_lsn has been called * (e.g. prealloc) and it then would get lsn 0, which is bad * when running undo and following SR */ Ptr<GlobalPage> pagePtr; m_global_page_pool.getPtr(pagePtr, ptr.p->m_real_page_i); File_formats::Datafile::Data_page* page = (File_formats::Datafile::Data_page*)pagePtr.p; Uint64 lsn = 0; lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32; lsn += page->m_page_header.m_page_lsn_lo; ptr.p->m_lsn = lsn; } ndbrequire(m_stats.m_current_io_waits > 0); m_stats.m_current_io_waits--; Loading Loading @@ -1576,6 +1610,12 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req) bool only_request = ptr.p->m_requests.isEmpty(); if (req_flags & Page_request::DELAY_REQ) { jam(); only_request = false; } if (only_request && state & Page_entry::MAPPED) { Loading Loading @@ -1623,6 +1663,9 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req) req_ptr.p->m_block = page_req.m_block; req_ptr.p->m_flags = page_req.m_flags; req_ptr.p->m_callback = page_req.m_callback; #ifdef ERROR_INSERT req_ptr.p->m_delay_until_time = page_req.m_delay_until_time; #endif state |= Page_entry::REQUEST; if (only_request && req_flags & Page_request::EMPTY_PAGE) Loading
storage/ndb/src/kernel/blocks/pgman.hpp +18 −2 Original line number Diff line number Diff line Loading @@ -256,12 +256,18 @@ private: ,DIRTY_REQ = 0x0200 // make page dirty wo/ update_lsn ,UNLOCK_PAGE = 0x0400 ,CORR_REQ = 0x0800 // correlated request (no LIRS update) #ifdef ERROR_INSERT ,DELAY_REQ = 0x1000 // Force request to be delayed #endif }; Uint16 m_block; Uint16 m_flags; SimulatedBlock::Callback m_callback; #ifdef ERROR_INSERT Uint64 m_delay_until_time; #endif Uint32 nextList; Uint32 m_magic; }; Loading Loading @@ -508,6 +514,10 @@ public: struct Request { Local_key m_page; SimulatedBlock::Callback m_callback; #ifdef ERROR_INSERT Uint64 m_delay_until_time; #endif }; Ptr<GlobalPage> m_ptr; // TODO remove Loading @@ -520,6 +530,9 @@ public: ,DIRTY_REQ = Pgman::Page_request::DIRTY_REQ ,UNLOCK_PAGE = Pgman::Page_request::UNLOCK_PAGE ,CORR_REQ = Pgman::Page_request::CORR_REQ #ifdef ERROR_INSERT ,DELAY_REQ = Pgman::Page_request::DELAY_REQ #endif }; /** Loading Loading @@ -588,6 +601,9 @@ Page_cache_client::get_page(Signal* signal, Request& req, Uint32 flags) page_req.m_block = m_block; page_req.m_flags = flags; page_req.m_callback = req.m_callback; #ifdef ERROR_INSERT page_req.m_delay_until_time = req.m_delay_until_time; #endif int i = m_pgman->get_page(signal, entry_ptr, page_req); if (i > 0) Loading