Commit e18f341b authored by msvensson@pilot.mysql.com's avatar msvensson@pilot.mysql.com
Browse files

Merge bk-internal:/home/bk/mysql-5.1-new-ndb

into  pilot.mysql.com:/data/msvensson/mysql/mysql-5.1-ndb
parents fa449d58 1025aaa9
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -70,7 +70,7 @@ Transporter::Transporter(TransporterRegistry &t_reg,
  signalIdUsed    = _signalId;

  m_connected     = false;
  m_timeOutMillis = 1000;
  m_timeOutMillis = 30000;

  m_connect_address.s_addr= 0;
  if(s_port<0)
@@ -101,7 +101,7 @@ Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {

  if(m_connected)
  {
    DBUG_RETURN(true); // TODO assert(0);
    DBUG_RETURN(false); // TODO assert(0);
  }
  
  {
+2 −1
Original line number Diff line number Diff line
@@ -758,7 +758,8 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
    TCP_Transporter * t = theTCPTransporters[i];
    
    // If the transporter is connected
    if (t->isConnected()) {
    NodeId nodeId = t->getRemoteNodeId();
    if (is_connected(nodeId) && t->isConnected()) {
      
      const NDB_SOCKET_TYPE socket = t->getSocket();
      // Find the highest socket value. It will be used by select
+7 −12
Original line number Diff line number Diff line
@@ -585,7 +585,6 @@ public:

    enum ExecSrStatus {
      IDLE = 0,
      ACTIVE_REMOVE_AFTER = 1,
      ACTIVE = 2
    };
    /**
@@ -869,11 +868,6 @@ public:
     *       heard of.
     */
    Uint8 fragDistributionKey;
    /**
     *       The identity of the next local checkpoint this fragment
     *       should perform.
     */
    Uint8 nextLcp;
   /**
     *       How many local checkpoints does the fragment contain
     */
@@ -2097,10 +2091,6 @@ private:
  void execEXEC_SRCONF(Signal* signal);
  void execREAD_PSEUDO_REQ(Signal* signal);

  void build_acc(Signal*, Uint32 fragPtrI);
  void execBUILDINDXREF(Signal*signal);
  void execBUILDINDXCONF(Signal*signal);
  
  void execDUMP_STATE_ORD(Signal* signal);
  void execACC_ABORTCONF(Signal* signal);
  void execNODE_FAILREP(Signal* signal);
@@ -2780,7 +2770,13 @@ private:
/*THIS VARIABLE KEEPS TRACK OF HOW MANY FRAGMENTS THAT PARTICIPATE IN        */
/*EXECUTING THE LOG. IF ZERO WE DON'T NEED TO EXECUTE THE LOG AT ALL.        */
/* ------------------------------------------------------------------------- */
  UintR cnoFragmentsExecSr;
  Uint32 cnoFragmentsExecSr;

  /**
   * This is no of sent GSN_EXEC_FRAGREQ during this log phase
   */
  Uint32 cnoOutstandingExecFragReq;

/* ------------------------------------------------------------------------- */
/*THIS VARIABLE KEEPS TRACK OF WHICH OF THE FIRST TWO RESTART PHASES THAT    */
/*HAVE COMPLETED.                                                            */
@@ -2801,7 +2797,6 @@ private:
  DLFifoList<Fragrecord> c_lcp_waiting_fragments;  // StartFragReq'ed
  DLFifoList<Fragrecord> c_lcp_restoring_fragments; // Restoring as we speek
  DLFifoList<Fragrecord> c_lcp_complete_fragments;  // Restored
  DLFifoList<Fragrecord> c_redo_complete_fragments; // Redo'ed
  
/* ------------------------------------------------------------------------- */
/*USED DURING SYSTEM RESTART, INDICATES THE OLDEST GCI THAT CAN BE RESTARTED */
+0 −4
Original line number Diff line number Diff line
@@ -168,7 +168,6 @@ Dblqh::Dblqh(Block_context& ctx):
  c_lcp_waiting_fragments(c_fragment_pool),
  c_lcp_restoring_fragments(c_fragment_pool),
  c_lcp_complete_fragments(c_fragment_pool),
  c_redo_complete_fragments(c_fragment_pool),
  m_commitAckMarkerHash(m_commitAckMarkerPool),
  c_scanTakeOverHash(c_scanRecordPool)
{
@@ -295,9 +294,6 @@ Dblqh::Dblqh(Block_context& ctx):

  addRecSignal(GSN_READ_PSEUDO_REQ, &Dblqh::execREAD_PSEUDO_REQ);

  addRecSignal(GSN_BUILDINDXREF, &Dblqh::execBUILDINDXREF);
  addRecSignal(GSN_BUILDINDXCONF, &Dblqh::execBUILDINDXCONF);

  addRecSignal(GSN_DEFINE_BACKUP_REF, &Dblqh::execDEFINE_BACKUP_REF);
  addRecSignal(GSN_DEFINE_BACKUP_CONF, &Dblqh::execDEFINE_BACKUP_CONF);

+49 −165
Original line number Diff line number Diff line
@@ -356,7 +356,6 @@ void Dblqh::execCONTINUEB(Signal* signal)
    break;
  case ZSR_PHASE3_START:
    jam();
    signal->theData[0] = data0;
    srPhase3Start(signal);
    return;
    break;
@@ -428,25 +427,25 @@ void Dblqh::execCONTINUEB(Signal* signal)
    if (fragptr.i != RNIL)
    {
      jam();
      c_redo_complete_fragments.getPtr(fragptr);
      c_lcp_complete_fragments.getPtr(fragptr);
      signal->theData[0] = fragptr.p->tabRef;
      signal->theData[1] = fragptr.p->fragId;
      sendSignal(DBACC_REF, GSN_EXPANDCHECK2, signal, 2, JBB);
      Ptr<Fragrecord> save = fragptr;
      c_redo_complete_fragments.next(fragptr);
      c_lcp_complete_fragments.next(fragptr);
      signal->theData[0] = ZENABLE_EXPAND_CHECK;
      signal->theData[1] = fragptr.i;
      sendSignal(DBLQH_REF, GSN_CONTINUEB, signal, 2, JBB);	
      c_redo_complete_fragments.remove(save);
      c_lcp_complete_fragments.remove(save);
      return;
    }
    else
    {
      jam();
      cstartRecReq = 2;
      ndbrequire(c_redo_complete_fragments.isEmpty());
      ndbrequire(c_lcp_complete_fragments.isEmpty());
      StartRecConf * conf = (StartRecConf*)signal->getDataPtrSend();
      conf->startingNodeId = getOwnNodeId();
      sendSignal(cmasterDihBlockref, GSN_START_RECCONF, signal, 
@@ -1121,7 +1120,6 @@ void Dblqh::execLQHFRAGREQ(Signal* signal)
  Uint32 minRowsHigh = req->minRowsHigh;
  Uint32 tschemaVersion = req->schemaVersion;
  Uint32 ttupKeyLength = req->keyLength;
  Uint32 nextLcp = req->nextLCP;
  Uint32 noOfKeyAttr = req->noOfKeyAttr;
  Uint32 noOfCharsets = req->noOfCharsets;
  Uint32 checksumIndicator = req->checksumIndicator;
@@ -1214,7 +1212,6 @@ void Dblqh::execLQHFRAGREQ(Signal* signal)
    fragptr.p->lcpFlag = Fragrecord::LCP_STATE_FALSE;
  }//if
  
  fragptr.p->nextLcp = nextLcp; 
//----------------------------------------------
// For node restarts it is not necessarily zero 
//----------------------------------------------
@@ -8939,6 +8936,9 @@ void Dblqh::storedProcConfScanLab(Signal* signal)
  case Fragrecord::REMOVING:
    jam();
  default:
    jamLine(fragptr.p->fragStatus);
    ndbout_c("fragptr.p->fragStatus: %u",
             fragptr.p->fragStatus);
    ndbrequire(false);
    break;
  }//switch
@@ -14141,15 +14141,12 @@ void Dblqh::execSTART_FRAGREQ(Signal* signal)
  if (lcpNo == (MAX_LCP_STORED - 1)) {
    jam();
    fragptr.p->lcpId[lcpNo] = lcpId;
    fragptr.p->nextLcp = 0;
  } else if (lcpNo < (MAX_LCP_STORED - 1)) {
    jam();
    fragptr.p->lcpId[lcpNo] = lcpId;
    fragptr.p->nextLcp = lcpNo + 1;
  } else {
    ndbrequire(lcpNo == ZNIL);
    jam();
    fragptr.p->nextLcp = 0;
  }//if
  fragptr.p->srNoLognodes = noOfLogNodes;
  fragptr.p->logFlag = Fragrecord::STATE_FALSE;
@@ -14181,19 +14178,9 @@ void Dblqh::execSTART_FRAGREQ(Signal* signal)
     */
    c_lcp_complete_fragments.add(fragptr);
    if(lcpNo == ZNIL)
    {
    signal->theData[0] = tabptr.i;
    signal->theData[1] = fragId;
    sendSignal(DBACC_REF, GSN_EXPANDCHECK2, signal, 2, JBB);
    }
    if (getNodeState().getNodeRestartInProgress())
    {
      jam();
      fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;	
    }
    c_tup->disk_restart_lcp_id(tabptr.i, fragId, RNIL);
    jamEntry();
    return;
@@ -14395,65 +14382,9 @@ void Dblqh::execSTART_RECCONF(Signal* signal)
    return;
  }
  c_lcp_complete_fragments.first(fragptr);
  build_acc(signal, fragptr.i);
  return;
}//Dblqh::execSTART_RECCONF()
void
Dblqh::build_acc(Signal* signal, Uint32 fragPtrI)
{
  fragptr.i = fragPtrI;
  while(fragptr.i != RNIL)
  {
    c_lcp_complete_fragments.getPtr(fragptr);
    tabptr.i = fragptr.p->tabRef;
    ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
    
    if(true || fragptr.i != tabptr.p->fragrec[0])
    {
      // Only need to send 1 build per table, TUP will rebuild all
      fragptr.i = fragptr.p->nextList;
      continue;
    }
    BuildIndxReq* const req = (BuildIndxReq*)signal->getDataPtrSend();
    req->setUserRef(reference());
    req->setConnectionPtr(fragptr.i);
    req->setRequestType(BuildIndxReq::RT_SYSTEMRESTART);
    req->setBuildId(0);   // not used
    req->setBuildKey(0);  // not used
    req->setIndexType(RNIL);
    req->setIndexId(RNIL);
    req->setTableId(tabptr.i);
    req->setParallelism(0);
    sendSignal(DBTUP_REF, GSN_BUILDINDXREQ, signal, 
	       BuildIndxReq::SignalLength, JBB);
    return;
  }
  startExecSr(signal);
}
void
Dblqh::execBUILDINDXREF(Signal* signal)
{
  ndbrequire(false);
}
void
Dblqh::execBUILDINDXCONF(Signal* signal)
{
  BuildIndxConf* conf = (BuildIndxConf*)signal->getDataPtrSend();    
  Uint32 fragPtrI = conf->getConnectionPtr();
  fragptr.i = fragPtrI;
  c_fragment_pool.getPtr(fragptr);
  infoEvent("LQH: primary key index %u rebuild done", fragptr.p->tabRef);
  build_acc(signal, fragptr.p->nextList);
}
/* ***************> */
/*  START_RECREF  > */
/* ***************> */
@@ -14472,9 +14403,9 @@ void Dblqh::execSTART_EXEC_SR(Signal* signal)
  fragptr.i = signal->theData[0];
  Uint32 next = RNIL;
  
  if (fragptr.i == RNIL) {
  if (fragptr.i == RNIL) 
  {
    jam();
    ndbrequire(cnoOfNodes < MAX_NDB_NODES);
    /* ----------------------------------------------------------------------
     *    NO MORE FRAGMENTS TO START EXECUTING THE LOG ON.
     *    SEND EXEC_SRREQ TO ALL LQH TO INDICATE THAT THIS NODE WILL 
@@ -14490,10 +14421,15 @@ void Dblqh::execSTART_EXEC_SR(Signal* signal)
  } else {
    jam();
    c_lcp_complete_fragments.getPtr(fragptr);
    if (fragptr.p->srNoLognodes > csrPhasesCompleted) {
    next = fragptr.p->nextList;
    if (fragptr.p->srNoLognodes > csrPhasesCompleted) 
    {
      jam();
      cnoOutstandingExecFragReq++;
      
      Uint32 index = csrPhasesCompleted;
      arrGuard(index, 4);
      arrGuard(index, MAX_LOG_EXEC);
      BlockReference ref = calcLqhBlockRef(fragptr.p->srLqhLognode[index]);
      fragptr.p->srStatus = Fragrecord::SS_STARTED;
@@ -14512,34 +14448,7 @@ void Dblqh::execSTART_EXEC_SR(Signal* signal)
      sendSignal(ref, GSN_EXEC_FRAGREQ, signal, 
		 ExecFragReq::SignalLength, JBB);
      next = fragptr.p->nextList;
    } else {
      jam();
      /* --------------------------------------------------------------------
       *  THIS FRAGMENT IS NOW FINISHED WITH THE SYSTEM RESTART. IT DOES 
       *  NOT NEED TO PARTICIPATE IN ANY MORE PHASES. REMOVE IT FROM THE 
       *  LIST OF COMPLETED FRAGMENTS TO EXECUTE THE LOG ON.
       *  ALSO SEND START_FRAGCONF TO DIH AND SET THE STATE TO ACTIVE ON THE
       *  FRAGMENT.
       * ------------------------------------------------------------------- */
      next = fragptr.p->nextList;
      c_lcp_complete_fragments.remove(fragptr);
      c_redo_complete_fragments.add(fragptr);
      
      if (!getNodeState().getNodeRestartInProgress())
      {
	fragptr.p->logFlag = Fragrecord::STATE_TRUE;
	fragptr.p->fragStatus = Fragrecord::FSACTIVE;
      } 
      else
      {	
	fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;	
    }
      signal->theData[0] = fragptr.p->srUserptr;
      signal->theData[1] = cownNodeid;
      sendSignal(fragptr.p->srBlockref, GSN_START_FRAGCONF, signal, 2, JBB);
      
    } //if
    signal->theData[0] = next;
    sendSignal(cownref, GSN_START_EXEC_SR, signal, 1, JBB);
  }//if
@@ -14560,24 +14469,8 @@ void Dblqh::execEXEC_FRAGREQ(Signal* signal)
  tabptr.i = execFragReq->tableId;
  Uint32 fragId = execFragReq->fragId;
  ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
  if (!getFragmentrec(signal, fragId)) {
    jam();
    if (!insertFragrec(signal, fragId)) {
      jam();
      sendExecFragRefLab(signal);
      return;
    }//if    
    initFragrec(signal, tabptr.i, fragId, ZLOG_NODE);
    fragptr.p->execSrStatus = Fragrecord::ACTIVE_REMOVE_AFTER;
  } else {
    jam();
    if (fragptr.p->execSrStatus == Fragrecord::ACTIVE_REMOVE_AFTER) {
      jam();
      fragptr.p->execSrStatus = Fragrecord::ACTIVE_REMOVE_AFTER;
    } else {
      jam();
    }//if
  }//if
  ndbrequire(getFragmentrec(signal, fragId));
  ndbrequire(fragptr.p->execSrNoReplicas < 4);
  fragptr.p->execSrBlockref[fragptr.p->execSrNoReplicas] = execFragReq->userRef;
  fragptr.p->execSrUserptr[fragptr.p->execSrNoReplicas] = execFragReq->userPtr;
@@ -14610,6 +14503,21 @@ void Dblqh::execEXEC_FRAGCONF(Signal* signal)
  fragptr.i = signal->theData[0];
  c_fragment_pool.getPtr(fragptr);
  fragptr.p->srStatus = Fragrecord::SS_COMPLETED;
  ndbrequire(cnoOutstandingExecFragReq);
  cnoOutstandingExecFragReq--;
  if (fragptr.p->srNoLognodes == csrPhasesCompleted + 1)
  {
    jam();
    
    fragptr.p->logFlag = Fragrecord::STATE_TRUE;
    fragptr.p->fragStatus = Fragrecord::FSACTIVE;
    
    signal->theData[0] = fragptr.p->srUserptr;
    signal->theData[1] = cownNodeid;
    sendSignal(fragptr.p->srBlockref, GSN_START_FRAGCONF, signal, 2, JBB);
  }
  
  return;
}//Dblqh::execEXEC_FRAGCONF()
@@ -14633,6 +14541,7 @@ void Dblqh::execEXEC_SRCONF(Signal* signal)
  Uint32 nodeId = signal->theData[0];
  arrGuard(nodeId, MAX_NDB_NODES);
  m_sr_exec_sr_conf.set(nodeId);
  if (!m_sr_nodes.equal(m_sr_exec_sr_conf))
  {
    jam();
@@ -14653,16 +14562,8 @@ void Dblqh::execEXEC_SRCONF(Signal* signal)
   *  NOW CHECK IF ALL FRAGMENTS IN THIS PHASE HAVE COMPLETED. IF SO START THE
   *  NEXT PHASE.
   * ----------------------------------------------------------------------- */
  c_lcp_complete_fragments.first(fragptr);
  while (fragptr.i != RNIL)
  {
    jam();
    if(fragptr.p->srStatus != Fragrecord::SS_COMPLETED)
    {
      return;
    }
    c_lcp_complete_fragments.next(fragptr);
  } 
  ndbrequire(cnoOutstandingExecFragReq == 0);
  execSrCompletedLab(signal);
  return;
}//Dblqh::execEXEC_SRCONF()
@@ -14718,6 +14619,7 @@ void Dblqh::execSrCompletedLab(Signal* signal)
     *   THERE ARE YET MORE PHASES TO RESTART.
     *   WE MUST INITIALISE DATA FOR NEXT PHASE AND SEND START SIGNAL.
     * --------------------------------------------------------------------- */
    csrPhaseStarted = ZSR_PHASE1_COMPLETED; // Set correct state first...
    startExecSr(signal);
  }//if
  return;
@@ -14791,7 +14693,8 @@ void Dblqh::srPhase3Start(Signal* signal)
  UintR tsrPhaseStarted;
  
  jamEntry();
  tsrPhaseStarted = signal->theData[0];
  tsrPhaseStarted = signal->theData[1];
  if (csrPhaseStarted == ZSR_NO_PHASE_STARTED) {
    jam();
    csrPhaseStarted = tsrPhaseStarted;
@@ -15968,18 +15871,6 @@ void Dblqh::sendExecConf(Signal* signal)
        sendSignal(fragptr.p->execSrBlockref[i], GSN_EXEC_FRAGCONF, 
		   signal, 1, JBB);
      }//for
      if (fragptr.p->execSrStatus == Fragrecord::ACTIVE) {
        jam();
        fragptr.p->execSrStatus = Fragrecord::IDLE;
      } else {
        ndbrequire(fragptr.p->execSrStatus == Fragrecord::ACTIVE_REMOVE_AFTER);
        jam();
        Uint32 fragId = fragptr.p->fragId;
        tabptr.i = fragptr.p->tabRef;
        ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
	c_lcp_complete_fragments.remove(fragptr);
        deleteFragrec(fragId);
      }//if
      fragptr.p->execSrNoReplicas = 0;
    }//if
    loopCount++;
@@ -16007,17 +15898,10 @@ void Dblqh::sendExecConf(Signal* signal)
void Dblqh::srPhase3Comp(Signal* signal) 
{
  jamEntry();
  ndbrequire(cnoOfNodes < MAX_NDB_NODES);
  for (Uint32 i = 0; i < cnoOfNodes; i++) {
    jam();
    if (cnodeStatus[i] == ZNODE_UP) {
      jam();
      ndbrequire(cnodeData[i] < MAX_NDB_NODES);
      BlockReference ref = calcLqhBlockRef(cnodeData[i]);
  signal->theData[0] = cownNodeid;
      sendSignal(ref, GSN_EXEC_SRCONF, signal, 1, JBB);
    }//if
  }//for
  NodeReceiverGroup rg(DBLQH, m_sr_nodes);
  sendSignal(rg, GSN_EXEC_SRCONF, signal, 1, JBB);
  return;
}//Dblqh::srPhase3Comp()
@@ -16259,7 +16143,7 @@ void Dblqh::srFourthComp(Signal* signal)
    if(cstartType == NodeState::ST_SYSTEM_RESTART)
    {
      jam();
      if (c_redo_complete_fragments.first(fragptr))
      if (c_lcp_complete_fragments.first(fragptr))
      {
	jam();
        signal->theData[0] = ZENABLE_EXPAND_CHECK;
@@ -17367,7 +17251,6 @@ void Dblqh::initFragrec(Signal* signal,
  fragptr.p->maxGciInLcp = 0;
  fragptr.p->copyFragState = ZIDLE;
  fragptr.p->newestGci = cnewestGci;
  fragptr.p->nextLcp = 0;
  fragptr.p->tabRef = tableId;
  fragptr.p->fragId = fragId;
  fragptr.p->srStatus = Fragrecord::SS_IDLE;
@@ -18456,6 +18339,7 @@ void Dblqh::sendLqhTransconf(Signal* signal, LqhTransConf::OperationStatus stat)
void Dblqh::startExecSr(Signal* signal) 
{
  cnoFragmentsExecSr = 0;
  cnoOutstandingExecFragReq = 0;
  c_lcp_complete_fragments.first(fragptr);
  signal->theData[0] = fragptr.i;
  sendSignal(cownref, GSN_START_EXEC_SR, signal, 1, JBB);
Loading