Commit 9a5ae909 authored by jonas@perch.ndb.mysql.com's avatar jonas@perch.ndb.mysql.com
Browse files

ndb - bug#31257

    handle partially complete LCP better in SR
parent edf125a5
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -318,6 +318,7 @@ public:
    Uint8 noOfStartedChkpt;

    MasterLCPConf::State lcpStateAtTakeOver;
    Uint32 m_remove_node_from_table_lcp_id;
  };
  typedef Ptr<NodeRecord> NodeRecordPtr;
  /**********************************************************************/
+34 −1
Original line number Diff line number Diff line
@@ -4989,6 +4989,18 @@ void Dbdih::startRemoveFailedNode(Signal* signal, NodeRecordPtr failedNodePtr)
    return;
  }
  
  /**
   * If node has node complete LCP
   *   we need to remove it as undo might not be complete
   *   bug#31257
   */
  failedNodePtr.p->m_remove_node_from_table_lcp_id = RNIL;
  if (c_lcpState.m_LCP_COMPLETE_REP_Counter_LQH.isWaitingFor(failedNodePtr.i))
  {
    jam();
    failedNodePtr.p->m_remove_node_from_table_lcp_id = SYSFILE->latestLCP_ID;
  }
  
  jam();
  signal->theData[0] = DihContinueB::ZREMOVE_NODE_FROM_TABLE;
  signal->theData[1] = failedNodePtr.i;
@@ -5630,6 +5642,11 @@ void Dbdih::removeNodeFromTable(Signal* signal,
    return;
  }//if  
  NodeRecordPtr nodePtr;
  nodePtr.i = nodeId;
  ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
  const Uint32 lcpId = nodePtr.p->m_remove_node_from_table_lcp_id;
  /**
   * For each fragment
   */
@@ -5637,7 +5654,6 @@ void Dbdih::removeNodeFromTable(Signal* signal,
  Uint32 noOfRemovedLcpReplicas = 0;  // No of replicas in LCP removed 
  Uint32 noOfRemainingLcpReplicas = 0;// No of replicas in LCP remaining
  //const Uint32 lcpId = SYSFILE->latestLCP_ID;
  const bool lcpOngoingFlag = (tabPtr.p->tabLcpStatus== TabRecord::TLS_ACTIVE);
  const bool unlogged = (tabPtr.p->tabStorage != TabRecord::ST_NORMAL);
  
@@ -5672,6 +5688,23 @@ void Dbdih::removeNodeFromTable(Signal* signal,
	  noOfRemovedLcpReplicas ++;
	  replicaPtr.p->lcpOngoingFlag = false;
	}
        if (lcpId != RNIL)
        {
          jam();
          Uint32 lcpNo = prevLcpNo(replicaPtr.p->nextLcp);
          if (replicaPtr.p->lcpStatus[lcpNo] == ZVALID && 
              replicaPtr.p->lcpId[lcpNo] == SYSFILE->latestLCP_ID)
          {
            jam();
            replicaPtr.p->lcpStatus[lcpNo] = ZINVALID;       
            replicaPtr.p->lcpId[lcpNo] = 0;
            replicaPtr.p->nextLcp = lcpNo;
            ndbout_c("REMOVING lcp: %u from table: %u frag: %u node: %u",
                     SYSFILE->latestLCP_ID,
                     tabPtr.i, fragNo, nodeId);
          }
        }
      }
    }
    if (!found)