Commit 5bc7f5f9 authored by unknown's avatar unknown
Browse files

ndb dd

  Fix SR bug that extent pages was scanned before undo was run
  Fix bug wrt page flushing/tsman and tup's dirty page list


storage/ndb/include/kernel/signaldata/PgmanContinueB.hpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  remove some unused code
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Add LCP_PREPARE to pgman
  Change order between TSMAN/LGMAN START_RECREQ
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/diskpage.hpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/lgman.cpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/pgman.cpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/pgman.hpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/tsman.cpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/tsman.hpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/vm/DLFifoList.hpp:
  Fix dd SR + dd free space bugs
storage/ndb/test/tools/hugoLoad.cpp:
  Fix dd SR + dd free space bugs
storage/ndb/tools/delete_all.cpp:
  Fix dd SR + dd free space bugs
parent 3e217c81
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -29,7 +29,8 @@ private:
    STATS_LOOP = 0,
    BUSY_LOOP = 1,
    CLEANUP_LOOP = 2,
    LCP_LOOP = 3
    LCP_LOOP = 3,
    LCP_PREPARE = 4
  };
};

+1 −9
Original line number Diff line number Diff line
@@ -1009,8 +1009,6 @@ public:
      LCP_SR_STARTED = 10,
      LCP_SR_COMPLETED = 11
    };
    LcpLocRecord m_acc;
    LcpLocRecord m_tup;
 
    LcpState lcpState;
    bool firstFragmentFlag;
@@ -1028,6 +1026,7 @@ public:
    bool   reportEmpty;
    NdbNodeBitmask m_EMPTY_LCP_REQ;

    Uint32 m_error;
    Uint32 m_outstanding;
  }; // Size 76 bytes
  typedef Ptr<LcpRecord> LcpRecordPtr;
@@ -2248,10 +2247,6 @@ private:
  bool checkLcpStarted(Signal* signal);
  void checkLcpTupprep(Signal* signal);
  void getNextFragForLcp(Signal* signal);
  void initLcpLocAcc(Signal* signal, Uint32 fragId);
  void initLcpLocTup(Signal* signal, Uint32 fragId);
  void releaseLocalLcps(Signal* signal);
  void seizeLcpLoc(Signal* signal);
  void sendAccContOp(Signal* signal);
  void sendStartLcp(Signal* signal);
  void setLogTail(Signal* signal, Uint32 keepGci);
@@ -2283,7 +2278,6 @@ private:
  void checkNewMbyte(Signal* signal);
  void checkReadExecSr(Signal* signal);
  void checkScanTcCompleted(Signal* signal);
  void checkSrCompleted(Signal* signal);
  void closeFile(Signal* signal, LogFileRecordPtr logFilePtr, Uint32 place);
  void completedLogPage(Signal* signal, Uint32 clpType, Uint32 place);
  void deleteFragrec(Uint32 fragId);
@@ -2302,7 +2296,6 @@ private:
  void initialiseFragrec(Signal* signal);
  void initialiseGcprec(Signal* signal);
  void initialiseLcpRec(Signal* signal);
  void initialiseLcpLocrec(Signal* signal);
  void initialiseLfo(Signal* signal);
  void initialiseLogFile(Signal* signal);
  void initialiseLogPage(Signal* signal);
@@ -2346,7 +2339,6 @@ private:
  void releaseActiveCopy(Signal* signal);
  void releaseAddfragrec(Signal* signal);
  void releaseFragrec();
  void releaseLcpLoc(Signal* signal);
  void releaseOprec(Signal* signal);
  void releasePageRef(Signal* signal);
  void releaseMmPages(Signal* signal);
+110 −82
Original line number Diff line number Diff line
@@ -10923,10 +10923,39 @@ void Dblqh::execLCP_PREPARE_REF(Signal* signal)
  tabptr.i = ref->tableId;
  ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
  
  ndbrequire(lcpPtr.p->m_outstanding);
  lcpPtr.p->m_outstanding--;
  /**
   * Only BACKUP is allowed to ref LCP_PREPARE
   */
  ndbrequire(refToBlock(signal->getSendersBlockRef()) == BACKUP);
  lcpPtr.p->m_error = ref->errorCode;
  
  if (lcpPtr.p->m_outstanding == 0)
  {
    jam();
    
    if(lcpPtr.p->firstFragmentFlag)
    {
      jam();
      LcpFragOrd *ord= (LcpFragOrd*)signal->getDataPtrSend();
      lcpPtr.p->firstFragmentFlag= false;
      *ord = lcpPtr.p->currentFragment.lcpFragOrd;
      EXECUTE_DIRECT(PGMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
      jamEntry();
      
      /**
       * First fragment mean that last LCP is complete :-)
       */
      EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, signal->length());
      jamEntry();
    }
    
    lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED;
  lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::ACC_COMPLETED;
    contChkpNextFragLab(signal);
  }
}
/* --------------------------------------------------------------------------
 *       PRECONDITION: LCP_PTR:LCP_STATE = WAIT_FRAGID
@@ -10946,14 +10975,45 @@ void Dblqh::execLCP_PREPARE_CONF(Signal* signal)
  fragptr.i = lcpPtr.p->currentFragment.fragPtrI;
  c_fragment_pool.getPtr(fragptr);
  if (refToBlock(signal->getSendersBlockRef()) != PGMAN)
  {
    ndbrequire(conf->tableId == fragptr.p->tabRef);
    ndbrequire(conf->fragmentId == fragptr.p->fragId);
  }
  
  lcpPtr.p->lcpState = LcpRecord::LCP_WAIT_HOLDOPS;
  lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::WAIT_LCPHOLDOP;
  ndbrequire(lcpPtr.p->m_outstanding);
  lcpPtr.p->m_outstanding--;
  if (lcpPtr.p->m_outstanding == 0)
  {
    jam();
    if(lcpPtr.p->firstFragmentFlag)
    {
      jam();
      LcpFragOrd *ord= (LcpFragOrd*)signal->getDataPtrSend();
      lcpPtr.p->firstFragmentFlag= false;
      *ord = lcpPtr.p->currentFragment.lcpFragOrd;
      EXECUTE_DIRECT(PGMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
      jamEntry();
      
      /**
       * First fragment mean that last LCP is complete :-)
       */
      EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, signal->length());
      jamEntry();
    }
    
    if (lcpPtr.p->m_error)
    {
      jam();
      lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED;
      contChkpNextFragLab(signal);
      return;
    }
    lcpPtr.p->lcpState = LcpRecord::LCP_WAIT_HOLDOPS;
    lcpPtr.p->lcpState = LcpRecord::LCP_START_CHKP;
  lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::HOLDOP_READY;
    
    /* ----------------------------------------------------------------------
     *    UPDATE THE MAX_GCI_IN_LCP AND MAX_GCI_COMPLETED_IN_LCP NOW BEFORE
@@ -10972,21 +11032,6 @@ void Dblqh::execLCP_PREPARE_CONF(Signal* signal)
      *ord = lcpPtr.p->currentFragment.lcpFragOrd;
      EXECUTE_DIRECT(DBTUP, GSN_LCP_FRAG_ORD, signal, signal->length());
      jamEntry();
    if(lcpPtr.p->firstFragmentFlag)
    {
      jam();
      lcpPtr.p->firstFragmentFlag= false;
      *ord = lcpPtr.p->currentFragment.lcpFragOrd;
      EXECUTE_DIRECT(PGMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
      jamEntry();
      /**
       * First fragment mean that last LCP is complete :-)
       */
      EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, signal->length());
      jamEntry();
    }
    }
    
    BackupFragmentReq* req= (BackupFragmentReq*)signal->getDataPtr();
@@ -11008,9 +11053,7 @@ void Dblqh::execLCP_PREPARE_CONF(Signal* signal)
      sendSignal(BACKUP_REF, GSN_BACKUP_FRAGMENT_REQ, signal, 
		 BackupFragmentReq::SignalLength, JBB);
    }
  
  lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::ACC_STARTED;
  lcpPtr.p->m_tup.lcpLocstate = LcpLocRecord::TUP_COMPLETED;
  }
}
void Dblqh::execBACKUP_FRAGMENT_REF(Signal* signal) 
@@ -11025,8 +11068,7 @@ void Dblqh::execBACKUP_FRAGMENT_CONF(Signal* signal)
  lcpPtr.i = 0;
  ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
  ndbrequire(lcpPtr.p->m_acc.lcpLocstate == LcpLocRecord::ACC_STARTED);
  lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::ACC_COMPLETED;
  ndbrequire(lcpPtr.p->lcpState == LcpRecord::LCP_START_CHKP);
  lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED;
  /* ------------------------------------------------------------------------
@@ -11143,6 +11185,9 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* signal)
    return;
  }
  lcpPtr.p->m_error = 0;
  lcpPtr.p->m_outstanding = 1;
  ndbrequire(tabPtr.p->tableStatus == Tablerec::TABLE_DEFINED);
  
  lcpPtr.p->lcpState = LcpRecord::LCP_WAIT_FRAGID;
@@ -11157,6 +11202,13 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* signal)
  req->backupId = lcpPtr.p->currentFragment.lcpFragOrd.lcpId;
  sendSignal(BACKUP_REF, GSN_LCP_PREPARE_REQ, signal, 
	     LcpPrepareReq::SignalLength, JBB);
  if (lcpPtr.p->firstFragmentFlag)
  {
    lcpPtr.p->m_outstanding++;
    sendSignal(PGMAN_REF, GSN_LCP_PREPARE_REQ, signal, 
	       LcpPrepareReq::SignalLength, JBB);
  }
}//Dblqh::sendLCP_FRAGIDREQ()
void Dblqh::sendEMPTY_LCP_CONF(Signal* signal, bool idle)
@@ -13600,7 +13652,7 @@ void Dblqh::execRESTORE_LCP_CONF(Signal* signal)
    lcpPtr.p->m_outstanding = 1;
    
    signal->theData[0] = c_lcpId;
    sendSignal(TSMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
    sendSignal(LGMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
    return;
  }
}
@@ -13654,7 +13706,7 @@ void Dblqh::execSTART_RECREQ(Signal* signal)
    lcpPtr.p->m_outstanding = 1;
    
    signal->theData[0] = c_lcpId;
    sendSignal(TSMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
    sendSignal(LGMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
  }//if
}//Dblqh::execSTART_RECREQ()
@@ -13680,13 +13732,14 @@ void Dblqh::execSTART_RECCONF(Signal* signal)
  switch(refToBlock(sender)){
  case TSMAN:
    jam();
    break;
  case LGMAN:
    jam();
    lcpPtr.p->m_outstanding++;
    signal->theData[0] = c_lcpId;
    sendSignal(LGMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
    sendSignal(TSMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
    return;
  case LGMAN:
    jam();
    break;
  default:
    ndbrequire(false);
@@ -15781,29 +15834,6 @@ void Dblqh::checkScanTcCompleted(Signal* signal)
  }//if
}//Dblqh::checkScanTcCompleted()
/* ========================================================================== 
 * === CHECK IF ALL PARTS OF A SYSTEM RESTART ON A FRAGMENT ARE COMPLETED === 
 *
 *       SUBROUTINE SHORT NAME = CSC
 * ========================================================================= */
void Dblqh::checkSrCompleted(Signal* signal) 
{
  terrorCode = ZOK;
  ptrGuard(lcpPtr);
  if(lcpPtr.p->m_acc.lcpLocstate != LcpLocRecord::SR_ACC_COMPLETED)
  {
    ndbrequire(lcpPtr.p->m_acc.lcpLocstate == LcpLocRecord::SR_ACC_STARTED);
    return;
  }
  
  if(lcpPtr.p->m_tup.lcpLocstate != LcpLocRecord::SR_TUP_COMPLETED) 
  {
    ndbrequire(lcpPtr.p->m_tup.lcpLocstate == LcpLocRecord::SR_TUP_STARTED);
    return;
  }
  lcpPtr.p->lcpState = LcpRecord::LCP_SR_COMPLETED;
}//Dblqh::checkSrCompleted()
/* ------------------------------------------------------------------------- */
/* ------       CLOSE A FILE DURING EXECUTION OF FRAGMENT LOG        ------- */
/*                                                                           */
@@ -18116,10 +18146,8 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
    // Print information about the current local checkpoint
    TlcpPtr.i = 0;
    ptrAss(TlcpPtr, lcpRecord);
    infoEvent(" lcpState=%d", TlcpPtr.p->lcpState);
    infoEvent(" lcpAccptr=%d lastFragmentFlag=%d",
	      TlcpPtr.p->m_acc.lcpRef,
	      TlcpPtr.p->lastFragmentFlag);
    infoEvent(" lcpState=%d lastFragmentFlag=%d", 
	      TlcpPtr.p->lcpState, TlcpPtr.p->lastFragmentFlag);
    infoEvent("currentFragment.fragPtrI=%d",
	      TlcpPtr.p->currentFragment.fragPtrI);
    infoEvent("currentFragment.lcpFragOrd.tableId=%d",
+10 −5
Original line number Diff line number Diff line
@@ -2600,7 +2600,7 @@ private:
  void disk_page_prealloc_callback_common(Signal*, 
					  Ptr<Page_request>, 
					  Ptr<Fragrecord>,
					  Ptr<GlobalPage>);
					  Ptr<Page>);
  
  void disk_page_alloc(Signal*, 
		       Tablerec*, Fragrecord*, Local_key*, PagePtr, Uint32);
@@ -2631,18 +2631,22 @@ private:

  void undo_createtable_callback(Signal* signal, Uint32 opPtrI, Uint32 unused);

  void disk_page_set_dirty(Ptr<Page>);
  void restart_setup_page(Ptr<Page>);
  void update_extent_pos(Disk_alloc_info&, Ptr<Extent_info>);
  
  /**
   * Disk restart code
   */
public:
  int disk_page_load_hook(Uint32 page_id);
  
  void disk_page_unmap_callback(Uint32 page_id);
  void disk_page_unmap_callback(Uint32 page_id, Uint32 dirty_count);
  
  int disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId, 
				const Local_key* key, Uint32 pages);
  void disk_restart_page_bits(Uint32 tableId, Uint32 fragId,
			      const Local_key*, Uint32 old_bits, Uint32 bits);
			      const Local_key*, Uint32 bits);
  void disk_restart_undo(Signal* signal, Uint64 lsn,
			 Uint32 type, const Uint32 * ptr, Uint32 len);

@@ -2654,6 +2658,7 @@ public:
    Ptr<Tablerec> m_table_ptr;
    Ptr<Fragrecord> m_fragment_ptr;
    Ptr<Page> m_page_ptr;
    Ptr<Extent_info> m_extent_ptr;
    Local_key m_key;
  };
  
@@ -2664,7 +2669,7 @@ private:
  void disk_restart_undo_alloc(Apply_undo*);
  void disk_restart_undo_update(Apply_undo*);
  void disk_restart_undo_free(Apply_undo*);
  void disk_restart_undo_page_bits(Apply_undo*);
  void disk_restart_undo_page_bits(Signal*, Apply_undo*);

#ifdef VM_TRACE
  void verify_page_lists(Disk_alloc_info&);
+4 −2
Original line number Diff line number Diff line
@@ -327,6 +327,8 @@ Dbtup::disk_page_commit_callback(Signal* signal,
  regOperPtr.p->m_commit_disk_callback_page= page_id;
  m_global_page_pool.getPtr(m_pgman.m_ptr, page_id);
  
  disk_page_set_dirty(*(Ptr<Page>*)&m_pgman.m_ptr);
  
  execTUP_COMMITREQ(signal);
  if(signal->theData[0] == 0)
    c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
@@ -477,8 +479,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
     * the page hot.   XXX move to TUP which knows better.
     */
    int flags= regOperPtr.p->op_struct.op_type |
      Page_cache_client::COMMIT_REQ | Page_cache_client::STRICT_ORDER |
      Page_cache_client::CORR_REQ;
      Page_cache_client::COMMIT_REQ | Page_cache_client::CORR_REQ;
    int res= m_pgman.get_page(signal, req, flags);
    switch(res){
    case 0:
@@ -491,6 +492,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
      ndbrequire("NOT YET IMPLEMENTED" == 0);
      break;
    }
    disk_page_set_dirty(*(Ptr<Page>*)&m_pgman.m_ptr);
    regOperPtr.p->m_commit_disk_callback_page= res;
    regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
  } 
Loading