Commit 1479ed3f authored by unknown's avatar unknown
Browse files

bug#4909 + testSystemRestart -n SR_FULLDB

1) Fix so that scan takeover is possible after SR
2) Reserve two pages for SR "zero pages"


ndb/include/ndbapi/NdbOperation.hpp:
  remove unused method
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Fix bug#4909
  don't reset tableFragptr during SR
ndb/src/kernel/blocks/dbtc/DbtcMain.cpp:
  Also send reason for disallowing rollback
ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
  Add bitmask of free pages to use for page 0 during SR
ndb/src/kernel/blocks/dbtup/DbtupDebug.cpp:
  More prinout in unit test of PageMan
ndb/src/kernel/blocks/dbtup/DbtupPagMan.cpp:
  Reserve 2 pages to use for SR
ndb/src/kernel/blocks/dbtup/DbtupSystemRestart.cpp:
  Don't alloc using "normal" allocConsPages when allocating for
  0-pages during SR, instead use 2 reserved pages
parent 39496af4
Loading
Loading
Loading
Loading
+0 −2
Original line number Diff line number Diff line
@@ -746,8 +746,6 @@ protected:
    
  int	 prepareSendInterpreted();            // Help routine to prepare*
   
  void	 TCOPCONF(Uint32 anNdbColumnImplLen);      // Handle TC[KEY/INDX]CONF signal
  
  int	 receiveTCKEYREF(NdbApiSignal*); 


+27 −19
Original line number Diff line number Diff line
@@ -107,6 +107,7 @@ operator<<(NdbOut& out, Dblqh::ScanRecord::ScanType state){
#endif
//#define MARKER_TRACE 1
//#define TRACE_SCAN_TAKEOVER 1
const Uint32 NR_ScanNo = 0;
@@ -3574,6 +3575,10 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
      key.scanNumber = KeyInfo20::getScanNo(regTcPtr->tcScanInfo);
      key.fragPtrI = fragptr.i;
      c_scanTakeOverHash.find(scanptr, key);
#ifdef TRACE_SCAN_TAKEOVER
      if(scanptr.i == RNIL)
	ndbout_c("not finding (%d %d)", key.scanNumber, key.fragPtrI);
#endif
    }
    if (scanptr.i == RNIL) {
      jam();
@@ -8272,7 +8277,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
  scanptr.p->scanLocalref[1] = 0;
  scanptr.p->scanLocalFragid = 0;
  scanptr.p->scanTcWaiting = ZTRUE;
  scanptr.p->scanNumber = ZNIL;
  scanptr.p->scanNumber = ~0;
  for (Uint32 i = 0; i < scanConcurrentOperations; i++) {
    jam();
@@ -8327,6 +8332,11 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
#ifdef VM_TRACE
    ScanRecordPtr tmp;
    ndbrequire(!c_scanTakeOverHash.find(tmp, * scanptr.p));
#endif
#ifdef TRACE_SCAN_TAKEOVER
    ndbout_c("adding (%d %d) table: %d fragId: %d frag.i: %d tableFragptr: %d",
	     scanptr.p->scanNumber, scanptr.p->fragPtrI,
	     tabptr.i, scanFragReq->fragmentNo, fragptr.i, fragptr.p->tableFragptr);
#endif
    c_scanTakeOverHash.add(scanptr);
  }
@@ -8418,6 +8428,9 @@ void Dblqh::finishScanrec(Signal* signal)
  if(scanptr.p->scanKeyinfoFlag){
    jam();
    ScanRecordPtr tmp;
#ifdef TRACE_SCAN_TAKEOVER
    ndbout_c("removing (%d %d)", scanptr.p->scanNumber, scanptr.p->fragPtrI);
#endif
    c_scanTakeOverHash.remove(tmp, * scanptr.p);
    ndbrequire(tmp.p == scanptr.p);
  }
@@ -8461,6 +8474,9 @@ void Dblqh::finishScanrec(Signal* signal)
    ndbrequire(!c_scanTakeOverHash.find(tmp, * restart.p));
#endif
    c_scanTakeOverHash.add(restart);
#ifdef TRACE_SCAN_TAKEOVER
    ndbout_c("adding-r (%d %d)", restart.p->scanNumber, restart.p->fragPtrI);
#endif
  }
  
  scanptr = restart;
@@ -12034,18 +12050,18 @@ void Dblqh::writeLogfileLab(Signal* signal)
/* WRITE.                                                                    */
/*---------------------------------------------------------------------------*/
  switch (logFilePtr.p->fileChangeState) {
#if 0
  case LogFileRecord::BOTH_WRITES_ONGOING:
    jam();
    ndbout_c("not crashing!!");
    // Fall-through
#endif
  case LogFileRecord::NOT_ONGOING:
    jam();
    checkGcpCompleted(signal,
                      ((lfoPtr.p->lfoPageNo + lfoPtr.p->noPagesRw) - 1),
                      lfoPtr.p->lfoWordWritten);
    break;
#if 0
  case LogFileRecord::BOTH_WRITES_ONGOING:
    jam();
    ndbout_c("not crashing!!");
    // Fall-through
#endif
  case LogFileRecord::WRITE_PAGE_ZERO_ONGOING:
  case LogFileRecord::LAST_WRITE_ONGOING:
    jam();
@@ -13133,20 +13149,11 @@ void Dblqh::execSTART_FRAGREQ(Signal* signal)
  ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
  if (!getFragmentrec(signal, fragId)) {
    jam();
    /* ----------------------------------------------------------------------
     *  FRAGMENT WAS NOT DEFINED YET. PUT IT IN. IF NO LOCAL CHECKPOINT EXISTED
     *  THEN THE FRAGMENT HAS ALREADY BEEN ADDED.
     * ---------------------------------------------------------------------- */
    if (!insertFragrec(signal, fragId)) {
      jam();
    startFragRefLab(signal);
    return;
  }//if
  }//if
  tabptr.p->tableStatus = Tablerec::TABLE_DEFINED;
  
  initFragrec(signal, tabptr.i, fragId, ZPRIMARY_NODE);
  initFragrecSr(signal);
  if (startFragReq->lcpNo == ZNIL) {
    jam();
@@ -16414,6 +16421,7 @@ void Dblqh::initFragrec(Signal* signal,
  fragptr.p->execSrNoReplicas = 0;
  fragptr.p->fragDistributionKey = 0;
  fragptr.p->activeTcCounter = 0;
  fragptr.p->tableFragptr = RNIL;
}//Dblqh::initFragrec()
/* ========================================================================== 
+2 −1
Original line number Diff line number Diff line
@@ -5280,8 +5280,9 @@ void Dbtc::execTCROLLBACKREQ(Signal* signal)
    signal->theData[1] = apiConnectptr.p->transid[0];
    signal->theData[2] = apiConnectptr.p->transid[1];
    signal->theData[3] = ZROLLBACKNOTALLOWED;
    signal->theData[4] = apiConnectptr.p->apiConnectstate;
    sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_TCROLLBACKREF, 
	       signal, 4, JBB);
	       signal, 5, JBB);
    break;
                                                 /* SEND A REFUSAL SIGNAL*/
  case CS_ABORTING:
+5 −0
Original line number Diff line number Diff line
@@ -2322,10 +2322,15 @@ private:
  // Counters for num UNDO log records executed
  Uint32 cSrUndoRecords[9];

  STATIC_CONST(MAX_PARALLELL_TUP_SRREQ = 2); 
  Uint32 c_sr_free_page_0;

  Uint32 c_errorInsert4000TableId;

  void initGlobalTemporaryVars();
  void reportMemoryUsage(Signal* signal, int incDec);

  
#ifdef VM_TRACE
  struct Th {
    Uint32 data[1];
+7 −0
Original line number Diff line number Diff line
@@ -201,6 +201,10 @@ Dbtup::execDUMP_STATE_ORD(Signal* signal)
	ndbrequire(chunk.pageCount <= alloc);
	if(chunk.pageCount != 0){
	  chunks.push_back(chunk);
	  if(chunk.pageCount != alloc) {
	    ndbout_c("  Tried to allocate %d - only allocated %d - free: %d",
		     alloc, chunk.pageCount, free);
	  }
	} else {
	  ndbout_c("  Failed to alloc %d pages with %d pages free",
		   alloc, free);
@@ -212,6 +216,9 @@ Dbtup::execDUMP_STATE_ORD(Signal* signal)
	  ptrCheckGuard(pagePtr, cnoOfPage, page);
	  pagePtr.p->pageWord[ZPAGE_STATE_POS] = ~ZFREE_COMMON;
	}

	if(alloc == 1 && free > 0)
	  ndbrequire(chunk.pageCount == alloc);
      }
	break;
      }
Loading