Commit d5fad326 authored by unknown's avatar unknown
Browse files

ndb dd - fix bug with deletes during LCP, that link between mm and dd could get lost


storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
  Introduce LCP keep list, for maintaining snapshot of LCP start
    list of tuples deleted during checkpoint
storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp:
  Change meaning of Tuple_header::FREE
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp:
  Handle LCP_KEEP/FREED/FREE on commit
storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp:
  Introduce LCP keep list, for maintaining snapshot of LCP start
    list of tuples deleted during checkpoint
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp:
  fix scanning lcp keep list
parent a8ea9cd1
Loading
Loading
Loading
Loading
+4 −1
Original line number Diff line number Diff line
@@ -604,6 +604,7 @@ struct Fragrecord {
  SLList<Page>::Head m_empty_pages; // Empty pages not in logical/physical map
  
  Uint32 m_lcp_scan_op;
  Uint32 m_lcp_keep_list;

  State fragStatus;
  Uint32 fragTableId;
@@ -1194,8 +1195,10 @@ typedef Ptr<HostBuffer> HostBufferPtr;
    STATIC_CONST( ALLOC       = 0x00100000 ); // Is record allocated now
    STATIC_CONST( MM_SHRINK   = 0x00200000 ); // Has MM part shrunk
    STATIC_CONST( MM_GROWN    = 0x00400000 ); // Has MM part grown
    STATIC_CONST( FREE        = 0x00800000 ); // On free list of page
    STATIC_CONST( FREED       = 0x00800000 ); // Is freed
    STATIC_CONST( LCP_SKIP    = 0x01000000 ); // Should not be returned in LCP
    STATIC_CONST( LCP_KEEP    = 0x02000000 ); // Should be returned in LCP
    STATIC_CONST( FREE        = 0x02800000 ); // Is free
    
    Uint32 get_tuple_version() const { 
      return m_header_bits & TUP_VERSION_MASK;
+1 −1
Original line number Diff line number Diff line
@@ -172,7 +172,7 @@ void Dbtup::execTUP_ABORTREQ(Signal* signal)
       */
      ndbout_c("clearing ALLOC");
      tuple_ptr->m_header_bits &= ~(Uint32)Tuple_header::ALLOC;
      tuple_ptr->m_header_bits |= Tuple_header::FREE;
      tuple_ptr->m_header_bits |= Tuple_header::FREED;
    }
  }
  
+42 −12
Original line number Diff line number Diff line
@@ -52,6 +52,15 @@ void Dbtup::execTUP_DEALLOCREQ(Signal* signal)
    PagePtr pagePtr;
    Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, regTabPtr.p);

    ndbassert(ptr->m_header_bits & Tuple_header::FREE);

    if (ptr->m_header_bits & Tuple_header::LCP_KEEP)
    {
      ndbassert(ptr.p->m_header_bits & Tuple_header::FREED);
      ptr->m_header_bits |= Tuple_header::FREED;
      return;
    }
    
    if (regTabPtr.p->m_attributes[MM].m_no_of_varsize)
    {
      ljam();
@@ -140,6 +149,15 @@ void Dbtup::initOpConnection(Operationrec* regOperPtr)
  regOperPtr->m_undo_buffer_space= 0;
}

static
inline
bool
operator>=(const Local_key& key1, const Local_key& key2)
{
  return key1.m_page_no > key2.m_page_no ||
    (key1.m_page_no == key2.m_page_no && key1.m_page_idx >= key2.m_page_idx);
}

void
Dbtup::dealloc_tuple(Signal* signal,
		     Uint32 gci,
@@ -149,8 +167,12 @@ Dbtup::dealloc_tuple(Signal* signal,
		     Fragrecord* regFragPtr, 
		     Tablerec* regTabPtr)
{
  ptr->m_header_bits |= Tuple_header::FREE;
  if (ptr->m_header_bits & Tuple_header::DISK_PART)
  Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
  Uint32 lcp_keep_list = regFragPtr->m_lcp_keep_list;

  Uint32 bits = ptr->m_header_bits;
  Uint32 extra_bits = Tuple_header::FREED;
  if (bits & Tuple_header::DISK_PART)
  {
    Local_key disk;
    memcpy(&disk, ptr->get_disk_ref_ptr(regTabPtr), sizeof(disk));
@@ -158,20 +180,28 @@ Dbtup::dealloc_tuple(Signal* signal,
		   &disk, *(PagePtr*)&m_pgman.m_ptr, gci);
  }
  
  if (regTabPtr->m_bits & Tablerec::TR_RowGCI)
  if (! (bits & Tuple_header::LCP_SKIP) && lcpScan_ptr_i != RNIL)
  {
    jam();
    * ptr->get_mm_gci(regTabPtr) = gci;
    ScanOpPtr scanOp;
    c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
    Local_key rowid = regOperPtr->m_tuple_location;
    Local_key scanpos = scanOp.p->m_scanPos.m_key;
    rowid.m_page_no = page->frag_page_id;
    if (rowid >= scanpos)
    {
      extra_bits = Tuple_header::LCP_KEEP; // Note REMOVE FREE
      ptr->m_operation_ptr_i = lcp_keep_list;
      regFragPtr->m_lcp_keep_list = rowid.ref();
    }
  }
  
static
inline
bool
operator>=(const Local_key& key1, const Local_key& key2)
  ptr->m_header_bits = bits | extra_bits;
  
  if (regTabPtr->m_bits & Tablerec::TR_RowGCI)
  {
  return key1.m_page_no > key2.m_page_no ||
    (key1.m_page_no == key2.m_page_no && key1.m_page_idx >= key2.m_page_idx);
    jam();
    * ptr->get_mm_gci(regTabPtr) = gci;
  }
}

void
+3 −2
Original line number Diff line number Diff line
@@ -139,6 +139,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
  regFragPtr.p->m_tablespace_id= tablespace;
  regFragPtr.p->m_undo_complete= false;
  regFragPtr.p->m_lcp_scan_op = RNIL; 
  regFragPtr.p->m_lcp_keep_list = RNIL;
  
  Uint32 noAllocatedPages= allocFragPages(regFragPtr.p, pages);
  
+53 −0
Original line number Diff line number Diff line
@@ -587,8 +587,14 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
  Uint32 foundGCI;
 
  bool mm = (bits & ScanOp::SCAN_DD);
  bool lcp = (bits & ScanOp::SCAN_LCP);
  Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
  Uint32 size = table.m_offsets[mm].m_fix_header_size +
    (bits & ScanOp::SCAN_VS ? Tuple_header::HeaderSize + 1: 0);

  if (lcp && lcp_list != RNIL)
    goto found_lcp_keep;
  
  while (true) {
    switch (pos.m_get) {
    case ScanPos::Get_next_page:
@@ -864,6 +870,53 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
  signal->theData[1] = scanPtr.i;
  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
  return false;

found_lcp_keep:
  Local_key tmp;
  tmp.assref(lcp_list);
  tmp.m_page_no = getRealpid(fragPtr.p, tmp.m_page_no);
  
  Ptr<Page> pagePtr;
  c_page_pool.getPtr(pagePtr, tmp.m_page_no);
  Tuple_header* ptr = (Tuple_header*)
    ((Fix_page*)pagePtr.p)->get_ptr(tmp.m_page_idx, 0);
  Uint32 headerbits = ptr->m_header_bits;
  ndbrequire(headerbits & Tuple_header::LCP_KEEP);
  
  Uint32 next = ptr->m_operation_ptr_i;
  ptr->m_operation_ptr_i = RNIL;
  ptr->m_header_bits = headerbits & ~(Uint32)Tuple_header::FREE;
  
  if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
    jam();
    setChecksum(ptr, tablePtr.p);
  }
  
  NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
  conf->scanPtr = scan.m_userPtr;
  conf->accOperationPtr = RNIL + 1;
  conf->fragId = frag.fragmentId;
  conf->localKey[0] = lcp_list;
  conf->localKey[1] = 0;
  conf->localKeyLength = 1;
  conf->gci = 0;
  Uint32 blockNo = refToBlock(scan.m_userRef);
  EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 7);
  
  fragPtr.p->m_lcp_keep_list = next;
  ptr->m_header_bits |= Tuple_header::FREED; // RESTORE free flag
  if (headerbits & Tuple_header::FREED)
  {
    if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
    {
      jam();
      free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
    } else {
      jam();
      free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
    }
  }
  return false;
}

void