Commit edf125a5 authored by jonas@perch.ndb.mysql.com's avatar jonas@perch.ndb.mysql.com
Browse files

ndb - bug#30975 (recommit to 51-telco-gca)

    - only update extent pages *after* flush of real page has been done
    - sync both create/drop of table into undolog (for disk tables)
parent 2583dae0
Loading
Loading
Loading
Loading
+8 −3
Original line number Diff line number Diff line
@@ -14042,11 +14042,16 @@ void Dblqh::execSTART_FRAGREQ(Signal* signal)
      fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;	
    }
    c_tup->disk_restart_mark_no_lcp(tabptr.i, fragId);
    c_tup->disk_restart_lcp_id(tabptr.i, fragId, RNIL);
    jamEntry();
    return;
  }//if
  }
  else
  {
    jam();
    c_tup->disk_restart_lcp_id(tabptr.i, fragId, lcpId);
    jamEntry();
  }
  c_lcpId = (c_lcpId == 0 ? lcpId : c_lcpId);
  c_lcpId = (c_lcpId < lcpId ? c_lcpId : lcpId);
+5 −4
Original line number Diff line number Diff line
@@ -624,7 +624,8 @@ struct Fragrecord {
  
  DLList<ScanOp>::Head m_scanList;

  enum { UC_LCP = 1, UC_CREATE = 2 };
  enum { UC_LCP = 1, UC_CREATE = 2, UC_SET_LCP = 3 };
  Uint32 m_restore_lcp_id;
  Uint32 m_undo_complete;
  Uint32 m_tablespace_id;
  Uint32 m_logfile_group_id;
@@ -2748,7 +2749,7 @@ private:
public:
  int disk_page_load_hook(Uint32 page_id);
  
  void disk_page_unmap_callback(Uint32 page_id, Uint32 dirty_count);
  void disk_page_unmap_callback(Uint32 when, Uint32 page, Uint32 dirty_count);
  
  int disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId, 
				const Local_key* key, Uint32 pages);
@@ -2769,11 +2770,11 @@ public:
    Local_key m_key;
  };

  void disk_restart_mark_no_lcp(Uint32 table, Uint32 frag);
  void disk_restart_lcp_id(Uint32 table, Uint32 frag, Uint32 lcpId);
  
private:
  void disk_restart_undo_next(Signal*);
  void disk_restart_undo_lcp(Uint32, Uint32, Uint32 flag);
  void disk_restart_undo_lcp(Uint32, Uint32, Uint32 flag, Uint32 lcpId);
  void disk_restart_undo_callback(Signal* signal, Uint32, Uint32);
  void disk_restart_undo_alloc(Apply_undo*);
  void disk_restart_undo_update(Apply_undo*);
+191 −63
Original line number Diff line number Diff line
@@ -907,8 +907,10 @@ Dbtup::disk_page_set_dirty(PagePtr pagePtr)
}

void
Dbtup::disk_page_unmap_callback(Uint32 page_id, Uint32 dirty_count)
Dbtup::disk_page_unmap_callback(Uint32 when,
				Uint32 page_id, Uint32 dirty_count)
{
  jamEntry();
  Ptr<GlobalPage> gpage;
  m_global_page_pool.getPtr(gpage, page_id);
  PagePtr pagePtr;
@@ -923,16 +925,8 @@ Dbtup::disk_page_unmap_callback(Uint32 page_id, Uint32 dirty_count)
    return ;
  }

  Local_key key;
  key.m_page_no = pagePtr.p->m_page_no;
  key.m_file_no = pagePtr.p->m_file_no;
  Uint32 idx = pagePtr.p->list_index;

  ndbassert((idx & 0x8000) == 0);

  if (DBG_DISK)
    ndbout << "disk_page_unmap_callback " << key << endl;
  
  Ptr<Tablerec> tabPtr;
  tabPtr.i= pagePtr.p->m_table_id;
  ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
@@ -942,8 +936,37 @@ Dbtup::disk_page_unmap_callback(Uint32 page_id, Uint32 dirty_count)
  
  Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
  
  if (when == 0)
  {
    /**
     * Before pageout
     */
    jam();

    if (DBG_DISK)
    {
      Local_key key;
      key.m_page_no = pagePtr.p->m_page_no;
      key.m_file_no = pagePtr.p->m_file_no;
      ndbout << "disk_page_unmap_callback(before) " << key 
	     << " cnt: " << dirty_count << " " << (idx & ~0x8000) << endl;
    }

    ndbassert((idx & 0x8000) == 0);

    ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
    LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]);
    list.remove(pagePtr);

    if (dirty_count == 0)
    {
      jam();
      pagePtr.p->list_index = idx | 0x8000;      
      
      Local_key key;
      key.m_page_no = pagePtr.p->m_page_no;
      key.m_file_no = pagePtr.p->m_file_no;
      
      Uint32 free = pagePtr.p->free_space;
      Uint32 used = pagePtr.p->uncommitted_used_space;
      ddassert(free >= used);
@@ -956,12 +979,40 @@ Dbtup::disk_page_unmap_callback(Uint32 page_id, Uint32 dirty_count)
      
      tsman.unmap_page(&key, idx);
      jamEntry();
    pagePtr.p->list_index = idx | 0x8000;
    }
  }
  else if (when == 1)
  {
    /**
     * After page out
     */
    jam();

  ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
  LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]);
  list.remove(pagePtr);
    Local_key key;
    key.m_page_no = pagePtr.p->m_page_no;
    key.m_file_no = pagePtr.p->m_file_no;
    Uint32 real_free = pagePtr.p->free_space;
    
    if (DBG_DISK)
    {
      ndbout << "disk_page_unmap_callback(after) " << key 
	     << " cnt: " << dirty_count << " " << (idx & ~0x8000) << endl;
    }

    Tablespace_client tsman(0, c_tsman,
			    fragPtr.p->fragTableId,
			    fragPtr.p->fragmentId,
			    fragPtr.p->m_tablespace_id);
    
    if (DBG_DISK && alloc.calc_page_free_bits(real_free) != (idx & ~0x8000))
    {
      ndbout << key 
	     << " calc: " << alloc.calc_page_free_bits(real_free)
	     << " idx: " << (idx & ~0x8000)
	     << endl;
    }
    tsman.update_page_free_bits(&key, alloc.calc_page_free_bits(real_free));
  }
}

void
@@ -992,20 +1043,6 @@ Dbtup::disk_page_alloc(Signal* signal,
    
    lsn= disk_page_undo_alloc(pagePtr.p, key, sz, gci, logfile_group_id);
  }

  Uint32 new_free = pagePtr.p->free_space;
  Uint32 new_bits= alloc.calc_page_free_bits(new_free);
  
  if (old_bits != new_bits)
  {
    Tablespace_client tsman(signal, c_tsman,
			    fragPtrP->fragTableId,
			    fragPtrP->fragmentId,
			    fragPtrP->m_tablespace_id);
    
    tsman.update_page_free_bits(key, new_bits, lsn);
    jamEntry();
  }
}

void
@@ -1049,17 +1086,6 @@ Dbtup::disk_page_free(Signal *signal,
  Uint32 new_free = pagePtr.p->free_space;
  Uint32 new_bits = alloc.calc_page_free_bits(new_free);
  
  if (old_bits != new_bits)
  {
    Tablespace_client tsman(signal, c_tsman,
			    fragPtrP->fragTableId,
			    fragPtrP->fragmentId,
			    fragPtrP->m_tablespace_id);
    
    tsman.update_page_free_bits(key, new_bits, lsn);
    jamEntry();
  }

  Uint32 ext = pagePtr.p->m_extent_info_ptr;
  Uint32 used = pagePtr.p->uncommitted_used_space;
  Uint32 old_idx = pagePtr.p->list_index;
@@ -1345,15 +1371,23 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
  case File_formats::Undofile::UNDO_LCP_FIRST:
  case File_formats::Undofile::UNDO_LCP:
  {
    jam();
    ndbrequire(len == 3);
    Uint32 lcp = ptr[0];
    Uint32 tableId = ptr[1] >> 16;
    Uint32 fragId = ptr[1] & 0xFFFF;
    disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_LCP);
    disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_LCP, lcp);
    disk_restart_undo_next(signal);
    
    if (DBG_UNDO)
    {
      ndbout_c("UNDO LCP %u (%u, %u)", lcp, tableId, fragId);
    }
    return;
  }
  case File_formats::Undofile::UNDO_TUP_ALLOC:
  {
    jam();
    Disk_undo::Alloc* rec= (Disk_undo::Alloc*)ptr;
    preq.m_page.m_page_no = rec->m_page_no;
    preq.m_page.m_file_no  = rec->m_file_no_page_idx >> 16;
@@ -1362,6 +1396,7 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
  }
  case File_formats::Undofile::UNDO_TUP_UPDATE:
  {
    jam();
    Disk_undo::Update* rec= (Disk_undo::Update*)ptr;
    preq.m_page.m_page_no = rec->m_page_no;
    preq.m_page.m_file_no  = rec->m_file_no_page_idx >> 16;
@@ -1370,6 +1405,7 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
  }
  case File_formats::Undofile::UNDO_TUP_FREE:
  {
    jam();
    Disk_undo::Free* rec= (Disk_undo::Free*)ptr;
    preq.m_page.m_page_no = rec->m_page_no;
    preq.m_page.m_file_no  = rec->m_file_no_page_idx >> 16;
@@ -1381,6 +1417,7 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
     * 
     */
  {
    jam();
    Disk_undo::Create* rec= (Disk_undo::Create*)ptr;
    Ptr<Tablerec> tabPtr;
    tabPtr.i= rec->m_table;
@@ -1388,12 +1425,34 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
    for(Uint32 i = 0; i<MAX_FRAG_PER_NODE; i++)
      if (tabPtr.p->fragrec[i] != RNIL)
	disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i], 
			      Fragrecord::UC_CREATE);
			      Fragrecord::UC_CREATE, 0);
    disk_restart_undo_next(signal);

    if (DBG_UNDO)
    {
      ndbout_c("UNDO CREATE (%u)", tabPtr.i);
    }
    return;
  }
  case File_formats::Undofile::UNDO_TUP_DROP:
  {
    jam();
    Disk_undo::Drop* rec = (Disk_undo::Drop*)ptr;
    Ptr<Tablerec> tabPtr;
    tabPtr.i= rec->m_table;
    ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
    for(Uint32 i = 0; i<MAX_FRAG_PER_NODE; i++)
      if (tabPtr.p->fragrec[i] != RNIL)
	disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i], 
			      Fragrecord::UC_CREATE, 0);
    disk_restart_undo_next(signal);

    if (DBG_UNDO)
    {
      ndbout_c("UNDO DROP (%u)", tabPtr.i);
    }
    return;
  }
  case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
    jam();
  case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
@@ -1402,6 +1461,7 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
    return;

  case File_formats::Undofile::UNDO_END:
    jam();
    f_undo_done = true;
    return;
  default:
@@ -1435,14 +1495,32 @@ Dbtup::disk_restart_undo_next(Signal* signal)
}

void
Dbtup::disk_restart_mark_no_lcp(Uint32 tableId, Uint32 fragId)
Dbtup::disk_restart_lcp_id(Uint32 tableId, Uint32 fragId, Uint32 lcpId)
{
  jamEntry();
  disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_CREATE);
  
  if (lcpId == RNIL)
  {
    disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_CREATE, 0);
    if (DBG_UNDO)
    {
      ndbout_c("mark_no_lcp (%u, %u)", tableId, fragId);
    }
  }
  else
  {
    disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_SET_LCP, lcpId); 
    if (DBG_UNDO)
    {
      ndbout_c("mark_no_lcp (%u, %u)", tableId, fragId);
    }

  }
}

void
Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId, Uint32 flag)
Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId, Uint32 flag, 
			     Uint32 lcpId)
{
  Ptr<Tablerec> tabPtr;
  tabPtr.i= tableId;
@@ -1450,11 +1528,43 @@ Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId, Uint32 flag)
  
  if (tabPtr.p->tableStatus == DEFINED)
  {
    jam();
    FragrecordPtr fragPtr;
    getFragmentrec(fragPtr, fragId, tabPtr.p);
    if (!fragPtr.isNull())
    {
      jam();
      switch(flag){
      case Fragrecord::UC_CREATE:
	jam();
	fragPtr.p->m_undo_complete |= flag;
	return;
      case Fragrecord::UC_LCP:
	jam();
	if (fragPtr.p->m_undo_complete == 0 && 
	    fragPtr.p->m_restore_lcp_id == lcpId)
	{
	  jam();
	  fragPtr.p->m_undo_complete |= flag;
	  if (DBG_UNDO)
            ndbout_c("table: %u fragment: %u lcp: %u -> done", 
		     tableId, fragId, lcpId);
	}
	return;
      case Fragrecord::UC_SET_LCP:
      {
	jam();
	if (DBG_UNDO)
           ndbout_c("table: %u fragment: %u restore to lcp: %u",
		    tableId, fragId, lcpId);
	ndbrequire(fragPtr.p->m_undo_complete == 0);
	ndbrequire(fragPtr.p->m_restore_lcp_id == RNIL);
	fragPtr.p->m_restore_lcp_id = lcpId;
	return;
      }
      }
      jamLine(flag);
      ndbrequire(false);
    }
  }
}
@@ -1478,6 +1588,7 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
      pagePtr.p->nextList != RNIL ||
      pagePtr.p->prevList != RNIL)
  {
    jam();
    update = true;
    pagePtr.p->list_index |= 0x8000;
    pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
@@ -1488,6 +1599,9 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
  
  if (tableId >= cnoOfTablerec)
  {
    jam();
    if (DBG_UNDO)
      ndbout_c("UNDO table> %u", tableId);
    disk_restart_undo_next(signal);
    return;
  }
@@ -1496,6 +1610,9 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
  
  if (undo->m_table_ptr.p->tableStatus != DEFINED)
  {
    jam();
    if (DBG_UNDO)
      ndbout_c("UNDO !defined (%u) ", tableId);
    disk_restart_undo_next(signal);
    return;
  }
@@ -1503,19 +1620,25 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
  getFragmentrec(undo->m_fragment_ptr, fragId, undo->m_table_ptr.p);
  if(undo->m_fragment_ptr.isNull())
  {
    jam();
    if (DBG_UNDO)
      ndbout_c("UNDO fragment null %u/%u", tableId, fragId);
    disk_restart_undo_next(signal);
    return;
  }

  if (undo->m_fragment_ptr.p->m_undo_complete)
  {
    jam();
    if (DBG_UNDO)
      ndbout_c("UNDO undo complete %u/%u", tableId, fragId);
    disk_restart_undo_next(signal);
    return;
  }
  
  Local_key key;
  key.m_page_no = pagePtr.p->m_page_no;
  key.m_file_no = pagePtr.p->m_file_no;
  Local_key key = undo->m_key;
//  key.m_page_no = pagePtr.p->m_page_no;
//  key.m_file_no = pagePtr.p->m_file_no;
  
  Uint64 lsn = 0;
  lsn += pagePtr.p->m_page_header.m_page_lsn_hi; lsn <<= 32;
@@ -1525,6 +1648,7 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
  
  if (undo->m_lsn <= lsn)
  {
    jam();
    if (DBG_UNDO)
    {
      ndbout << "apply: " << undo->m_lsn << "(" << lsn << " )" 
@@ -1539,12 +1663,15 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
     */
    switch(undo->m_type){
    case File_formats::Undofile::UNDO_TUP_ALLOC:
      jam();
      disk_restart_undo_alloc(undo);
      break;
    case File_formats::Undofile::UNDO_TUP_UPDATE:
      jam();
      disk_restart_undo_update(undo);
      break;
    case File_formats::Undofile::UNDO_TUP_FREE:
      jam();
      disk_restart_undo_free(undo);
      break;
    default:
@@ -1559,14 +1686,17 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
    
    m_pgman.update_lsn(undo->m_key, lsn);
    jamEntry();

    disk_restart_undo_page_bits(signal, undo);
  }
  else if (DBG_UNDO)
  {
    jam();
    ndbout << "ignore: " << undo->m_lsn << "(" << lsn << " )" 
	   << key << " type: " << undo->m_type << endl;
	   << key << " type: " << undo->m_type 
	   << " tab: " << tableId << endl;
  }

  disk_restart_undo_page_bits(signal, undo);
  disk_restart_undo_next(signal);
}

@@ -1641,16 +1771,12 @@ Dbtup::disk_restart_undo_page_bits(Signal* signal, Apply_undo* undo)
  Uint32 new_bits = alloc.calc_page_free_bits(free);
  pageP->list_index = 0x8000 | new_bits;

  Uint64 lsn = 0;
  lsn += pageP->m_page_header.m_page_lsn_hi; lsn <<= 32;
  lsn += pageP->m_page_header.m_page_lsn_lo;
  
  Tablespace_client tsman(signal, c_tsman,
			  fragPtrP->fragTableId,
			  fragPtrP->fragmentId,
			  fragPtrP->m_tablespace_id);
  
  tsman.restart_undo_page_free_bits(&undo->m_key, new_bits, undo->m_lsn, lsn);
  tsman.restart_undo_page_free_bits(&undo->m_key, new_bits);
  jamEntry();
}

@@ -1687,6 +1813,7 @@ Dbtup::disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId,
      
      if (alloc.m_curr_extent_info_ptr_i != RNIL)
      {
	jam();
	Ptr<Extent_info> old;
	c_extent_pool.getPtr(old, alloc.m_curr_extent_info_ptr_i);
	ndbassert(old.p->m_free_matrix_pos == RNIL);
@@ -1713,6 +1840,7 @@ void
Dbtup::disk_restart_page_bits(Uint32 tableId, Uint32 fragId,
			      const Local_key*, Uint32 bits)
{
  jam();
  TablerecPtr tabPtr;
  FragrecordPtr fragPtr;
  tabPtr.i = tableId;
+21 −11
Original line number Diff line number Diff line
@@ -146,6 +146,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
  regFragPtr.p->m_lcp_scan_op = RNIL; 
  regFragPtr.p->m_lcp_keep_list = RNIL;
  regFragPtr.p->m_var_page_chunks = RNIL;  
  regFragPtr.p->m_restore_lcp_id = RNIL;

  if (ERROR_INSERTED(4007) && regTabPtr.p->fragid[0] == fragId ||
      ERROR_INSERTED(4008) && regTabPtr.p->fragid[1] == fragId) {
@@ -673,11 +674,11 @@ Dbtup::undo_createtable_callback(Signal* signal, Uint32 opPtrI, Uint32 unused)
  switch(ret){
  case 0:
    return;
  case -1:
    warningEvent("Failed to sync log for create of table: %u", regTabPtr.i);
  default:
    ndbout_c("ret: %d", ret);
    ndbrequire(false);
    execute(signal, req.m_callback, regFragPtr.p->m_logfile_group_id);
  }
  
}

void
@@ -958,8 +959,6 @@ void Dbtup::releaseFragment(Signal* signal, Uint32 tableId,
    return;
  }

#if NOT_YET_UNDO_DROP_TABLE
#error "This code is complete, but I prefer not to enable it until I need it"
  if (logfile_group_id != RNIL)
  {
    Callback cb;
@@ -968,6 +967,13 @@ void Dbtup::releaseFragment(Signal* signal, Uint32 tableId,
      safe_cast(&Dbtup::drop_table_log_buffer_callback);
    Uint32 sz= sizeof(Disk_undo::Drop) >> 2;
    int r0 = c_lgman->alloc_log_space(logfile_group_id, sz);
    if (r0)
    {
      jam();
      warningEvent("Failed to alloc log space for drop table: %u",
 		   tabPtr.i);
      goto done;
    }

    Logfile_client lgman(this, c_lgman, logfile_group_id);
    int res= lgman.get_log_buffer(signal, sz, &cb);
@@ -976,15 +982,18 @@ void Dbtup::releaseFragment(Signal* signal, Uint32 tableId,
      ljam();
      return;
    case -1:
      ndbrequire("NOT YET IMPLEMENTED" == 0);
      warningEvent("Failed to get log buffer for drop table: %u",
		   tabPtr.i);
      c_lgman->free_log_space(logfile_group_id, sz);
      goto done;
      break;
    default:
      execute(signal, cb, logfile_group_id);
      return;
    }
  }
#endif

done:
  drop_table_logsync_callback(signal, tabPtr.i, RNIL);
}

@@ -1163,9 +1172,10 @@ Dbtup::drop_table_log_buffer_callback(Signal* signal, Uint32 tablePtrI,
  switch(ret){
  case 0:
    return;
  case -1:
    warningEvent("Failed to syn log for drop of table: %u", tablePtrI);
  default:
    ndbout_c("ret: %d", ret);
    ndbrequire(false);
    execute(signal, req.m_callback, logfile_group_id);
  }
}

+10 −2
Original line number Diff line number Diff line
@@ -2684,8 +2684,16 @@ Lgman::execute_undo_record(Signal* signal)
      Uint32 lcp = * (ptr - len + 1);
      if(m_latest_lcp && lcp > m_latest_lcp)
      {
	// Just ignore
	break;
        if (0)
        {
          const Uint32 * base = ptr - len + 1;
          Uint32 lcp = base[0];
          Uint32 tableId = base[1] >> 16;
          Uint32 fragId = base[1] & 0xFFFF;

          ndbout_c("NOT! ignoring lcp: %u tab: %u frag: %u", 
                   lcp, tableId, fragId);
        }
      }

      if(m_latest_lcp == 0 || 
Loading