Commit 4e24154e authored by unknown's avatar unknown
Browse files

ndb - bug#24664

  1) run lcp snapshot for both MM and DD tables (so I dont have to change restore to use WRITE)
  2) fix >= and > bug in lcp skip/keep handling
  3) very cool test prog for this :-)


storage/ndb/include/kernel/signaldata/BackupContinueB.hpp:
  Add new error insert
storage/ndb/src/kernel/blocks/ERROR_codes.txt:
  Add new error insert
storage/ndb/src/kernel/blocks/backup/Backup.cpp:
  Add new error insert
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp:
  1) get >= and > correct for lcp keep/skip
  2) always run lcp snapshot impl. (previously only for dd tables)
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp:
  1) get >= and > correct for lcp keep/skip
  2) always run lcp snapshot impl. (previously only for dd tables)
storage/ndb/test/ndbapi/testSystemRestart.cpp:
  add testcase
storage/ndb/test/run-test/daily-basic-tests.txt:
  add testcase
parent 61956dd5
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -33,7 +33,8 @@ private:
    BUFFER_FULL_FRAG_COMPLETE = 3,
    BUFFER_FULL_META  = 4,
    BACKUP_FRAGMENT_INFO = 5,
    RESET_DISK_SPEED_COUNTER = 6
    RESET_DISK_SPEED_COUNTER = 6,
    ZDELAY_SCAN_NEXT = 7
  };
};

+4 −1
Original line number Diff line number Diff line
@@ -8,7 +8,7 @@ Next DBDICT 6007
Next DBDIH 7178
Next DBTC 8039
Next CMVMI 9000
Next BACKUP 10036
Next BACKUP 10038
Next DBUTIL 11002
Next DBTUX 12008
Next SUMA 13001
@@ -425,6 +425,9 @@ Backup Stuff:
10034: define backup reply error
10035: Fail to allocate buffers

10036: Halt backup for table >= 2
10037: Resume backup (from 10036)

11001: Send UTIL_SEQUENCE_REF (in master)

5028:  Crash when receiving LQHKEYREQ (in non-master)
+35 −0
Original line number Diff line number Diff line
@@ -356,6 +356,25 @@ Backup::execCONTINUEB(Signal* signal)
	       GetTabInfoReq::SignalLength, JBB);
    return;
  }
  case BackupContinueB::ZDELAY_SCAN_NEXT:
    if (ERROR_INSERTED(10036))
    {
      jam();
      sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 300, 
			  signal->getLength());
      return;
    }
    else
    {
      jam();
      CLEAR_ERROR_INSERT_VALUE;
      ndbout_c("Resuming backup");
      memmove(signal->theData, signal->theData + 1, 
	      4*ScanFragNextReq::SignalLength);
      sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal, 
		 ScanFragNextReq::SignalLength, JBB);
      return ;
    }
  default:
    ndbrequire(0);
  }//switch
@@ -3920,6 +3939,22 @@ Backup::checkScan(Signal* signal, BackupFilePtr filePtr)
    req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);
    req->batch_size_rows= 16;
    req->batch_size_bytes= 0;

    if (ERROR_INSERTED(10036) && 
	filePtr.p->tableId >= 2 &&
	filePtr.p->operation.noOfRecords > 0)
    {
      ndbout_c("halting backup for table %d fragment: %d after %d records",
	       filePtr.p->tableId,
	       filePtr.p->fragmentNo,
	       filePtr.p->operation.noOfRecords);
      memmove(signal->theData+1, signal->theData, 
	      4*ScanFragNextReq::SignalLength);
      signal->theData[0] = BackupContinueB::ZDELAY_SCAN_NEXT;
      sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 
			  300, 1+ScanFragNextReq::SignalLength);
      return;
    }
    if(ERROR_INSERTED(10032))
      sendSignalWithDelay(DBLQH_REF, GSN_SCAN_NEXTREQ, signal, 
			  100, ScanFragNextReq::SignalLength);
+16 −17
Original line number Diff line number Diff line
@@ -152,10 +152,10 @@ void Dbtup::initOpConnection(Operationrec* regOperPtr)
static
inline
bool
operator>=(const Local_key& key1, const Local_key& key2)
operator>(const Local_key& key1, const Local_key& key2)
{
  return key1.m_page_no > key2.m_page_no ||
    (key1.m_page_no == key2.m_page_no && key1.m_page_idx >= key2.m_page_idx);
    (key1.m_page_no == key2.m_page_no && key1.m_page_idx > key2.m_page_idx);
}

void
@@ -187,7 +187,7 @@ Dbtup::dealloc_tuple(Signal* signal,
    Local_key rowid = regOperPtr->m_tuple_location;
    Local_key scanpos = scanOp.p->m_scanPos.m_key;
    rowid.m_page_no = page->frag_page_id;
    if (rowid >= scanpos)
    if (rowid > scanpos)
    {
      extra_bits = Tuple_header::LCP_KEEP; // Note REMOVE FREE
      ptr->m_operation_ptr_i = lcp_keep_list;
@@ -215,6 +215,7 @@ Dbtup::commit_operation(Signal* signal,
{
  ndbassert(regOperPtr->op_struct.op_type != ZDELETE);
  
  Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
  Uint32 save= tuple_ptr->m_operation_ptr_i;
  Uint32 bits= tuple_ptr->m_header_bits;

@@ -264,7 +265,6 @@ Dbtup::commit_operation(Signal* signal,
    Local_key key;
    memcpy(&key, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
    Uint32 logfile_group_id= regFragPtr->m_logfile_group_id;
    Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;

    PagePtr diskPagePtr = *(PagePtr*)&m_pgman.m_ptr;
    ndbassert(diskPagePtr.p->m_page_no == key.m_page_no);
@@ -273,19 +273,6 @@ Dbtup::commit_operation(Signal* signal,
    if(copy_bits & Tuple_header::DISK_ALLOC)
    {
      disk_page_alloc(signal, regTabPtr, regFragPtr, &key, diskPagePtr, gci);

      if(lcpScan_ptr_i != RNIL)
      {
	ScanOpPtr scanOp;
	c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
	Local_key rowid = regOperPtr->m_tuple_location;
	Local_key scanpos = scanOp.p->m_scanPos.m_key;
	rowid.m_page_no = pagePtr.p->frag_page_id;
	if(rowid >= scanpos)
	{
	  copy_bits |= Tuple_header::LCP_SKIP;
	}
      }
    }
    
    if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
@@ -312,6 +299,18 @@ Dbtup::commit_operation(Signal* signal,
    copy_bits |= Tuple_header::DISK_PART;
  }
  
  if(lcpScan_ptr_i != RNIL)
  {
    ScanOpPtr scanOp;
    c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
    Local_key rowid = regOperPtr->m_tuple_location;
    Local_key scanpos = scanOp.p->m_scanPos.m_key;
    rowid.m_page_no = pagePtr.p->frag_page_id;
    if(rowid > scanpos)
    {
       copy_bits |= Tuple_header::LCP_SKIP;
    }
  }
  
  Uint32 clear= 
    Tuple_header::ALLOC | Tuple_header::FREE |
+18 −22
Original line number Diff line number Diff line
@@ -54,8 +54,7 @@ Dbtup::execACC_SCANREQ(Signal* signal)
    // flags
    Uint32 bits = 0;
    
    if (!AccScanReq::getLcpScanFlag(req->requestInfo) ||
	tablePtr.p->m_no_of_disk_attributes == 0)
    if (!AccScanReq::getLcpScanFlag(req->requestInfo))
    {
      // seize from pool and link to per-fragment list
      LocalDLList<ScanOp> list(c_scanOpPool, frag.m_scanList);
@@ -1052,8 +1051,6 @@ Dbtup::execLCP_FRAG_ORD(Signal* signal)
  tablePtr.i = req->tableId;
  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);

  if(tablePtr.p->m_no_of_disk_attributes)
  {
  jam();
  FragrecordPtr fragPtr;
  Uint32 fragId = req->fragmentId;
@@ -1072,4 +1069,3 @@ Dbtup::execLCP_FRAG_ORD(Signal* signal)
  scanFirst(signal, scanPtr);
  scanPtr.p->m_state = ScanOp::First;
}
}
Loading