Commit 1121ca8f authored by unknown's avatar unknown
Browse files

ndb - bug#21948 & bug#17605

  fix alloc/free extent in undo log
  allow extent to be reused once a lcp is finished (instead of when next lcp starts)


storage/ndb/include/kernel/signaldata/Extent.hpp:
  Add lsn to alloc extent
storage/ndb/src/kernel/blocks/diskpage.hpp:
  Add (unused) undo entries for drop table, and alloc/free extent
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Notify TSMAN of both start and stop of LCP
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
  1) Add unused undo entries for drop table, alloc/free extent
  2) handle create_table better (correct?) in undo log
  3) fix some typos/style
storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp:
  1) Add unused undo entries for drop table, alloc/free extent
  2) handle create_table better (correct?) in undo log
  3) fix some typos/style
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp:
  fix style
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp:
  fix typo/style
storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp:
  Make sure regFragPtr.p->m_logfile_group_id = RNIL is applicable
storage/ndb/src/kernel/blocks/lgman.cpp:
  Add m_logfile_group_id to log callback
storage/ndb/src/kernel/blocks/print_file.cpp:
  Add (unused) undo entries for drop table, and alloc/free extent
storage/ndb/src/kernel/blocks/tsman.cpp:
  1) change so that LCP limit on resuse of extent is only for duration of lcp
  2) refactor so lookup_extent is put into subroutine
storage/ndb/src/kernel/blocks/tsman.hpp:
  refactor so lookup_extent is put into subroutine
parent ca967328
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -74,6 +74,8 @@ struct FreeExtentReq {
      Local_key key;
      Uint32 table_id;
      Uint32 tablespace_id;
      Uint32 lsn_hi;
      Uint32 lsn_lo;
    } request;
    struct 
    {
+5 −2
Original line number Diff line number Diff line
@@ -11276,7 +11276,7 @@ void Dblqh::execLCP_PREPARE_REF(Signal* signal)
      /**
       * First fragment mean that last LCP is complete :-)
       */
      EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, signal->length());
      EXECUTE_DIRECT(TSMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
      jamEntry();
    }
    
@@ -11327,7 +11327,7 @@ void Dblqh::execLCP_PREPARE_CONF(Signal* signal)
      /**
       * First fragment mean that last LCP is complete :-)
       */
      EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, signal->length());
      EXECUTE_DIRECT(TSMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
      jamEntry();
    }
    
@@ -11611,6 +11611,9 @@ void Dblqh::completeLcpRoundLab(Signal* signal, Uint32 lcpId)
  sendSignal(LGMAN_REF, GSN_END_LCP_REQ, signal, 
	     EndLcpReq::SignalLength, JBB);
  
  EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, EndLcpReq::SignalLength);
  jamEntry();
  
  lcpPtr.i = 0;
  ptrAss(lcpPtr, lcpRecord);
  lcpPtr.p->m_outstanding = 3;
+40 −5
Original line number Diff line number Diff line
@@ -625,7 +625,8 @@ struct Fragrecord {
  
  DLList<ScanOp>::Head m_scanList;

  bool m_undo_complete;
  enum { UC_LCP = 1, UC_CREATE = 2 };
  Uint32 m_undo_complete;
  Uint32 m_tablespace_id;
  Uint32 m_logfile_group_id;
  Disk_alloc_info m_disk_alloc_info;
@@ -989,6 +990,9 @@ ArrayPool<TupTriggerData> c_triggerPool;
      ,UNDO_UPDATE = File_formats::Undofile::UNDO_TUP_UPDATE
      ,UNDO_FREE = File_formats::Undofile::UNDO_TUP_FREE
      ,UNDO_CREATE = File_formats::Undofile::UNDO_TUP_CREATE
      ,UNDO_DROP = File_formats::Undofile::UNDO_TUP_DROP
      ,UNDO_ALLOC_EXTENT = File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT
      ,UNDO_FREE_EXTENT = File_formats::Undofile::UNDO_TUP_FREE_EXTENT
    };
    
    struct Alloc 
@@ -1021,6 +1025,30 @@ ArrayPool<TupTriggerData> c_triggerPool;
      Uint32 m_table;
      Uint32 m_type_length; // 16 bit type, 16 bit length
    };

    struct Drop
    {
      Uint32 m_table;
      Uint32 m_type_length; // 16 bit type, 16 bit length
    };

    struct AllocExtent
    {
      Uint32 m_table;
      Uint32 m_fragment;
      Uint32 m_page_no;
      Uint32 m_file_no;
      Uint32 m_type_length;
    };

    struct FreeExtent
    {
      Uint32 m_table;
      Uint32 m_fragment;
      Uint32 m_page_no;
      Uint32 m_file_no;
      Uint32 m_type_length;
    };
  };
  
  Extent_info_pool c_extent_pool;
@@ -1420,7 +1448,7 @@ public:
  int nr_delete(Signal*, Uint32, Uint32 fragPtr, const Local_key*, Uint32 gci);

  void nr_delete_page_callback(Signal*, Uint32 op, Uint32 page);
  void nr_delete_logbuffer_callback(Signal*, Uint32 op, Uint32 page);
  void nr_delete_log_buffer_callback(Signal*, Uint32 op, Uint32 page);
private:
  BLOCK_DEFINES(Dbtup);

@@ -2345,9 +2373,10 @@ private:
                        Uint32 fragId);


  void releaseFragment(Signal* signal, Uint32 tableId);
  void releaseFragment(Signal* signal, Uint32 tableId, Uint32);
  void drop_fragment_free_var_pages(Signal*);
  void drop_fragment_free_exent(Signal*, TablerecPtr, FragrecordPtr, Uint32);
  void drop_fragment_free_extent(Signal*, TablerecPtr, FragrecordPtr, Uint32);
  void drop_fragment_free_extent_log_buffer_callback(Signal*, Uint32, Uint32);
  void drop_fragment_unmap_pages(Signal*, TablerecPtr, FragrecordPtr, Uint32);
  void drop_fragment_unmap_page_callback(Signal* signal, Uint32, Uint32);
  
@@ -2632,6 +2661,9 @@ private:
  
  void disk_page_log_buffer_callback(Signal*, Uint32 opPtrI, Uint32); 

  void disk_page_alloc_extent_log_buffer_callback(Signal*, Uint32, Uint32);
  void disk_page_free_extent_log_buffer_callback(Signal*, Uint32, Uint32);
  
  Uint64 disk_page_undo_alloc(Page*, const Local_key*,
			      Uint32 sz, Uint32 gci, Uint32 logfile_group_id);

@@ -2646,6 +2678,9 @@ private:
  void undo_createtable_callback(Signal* signal, Uint32 opPtrI, Uint32 unused);
  void undo_createtable_logsync_callback(Signal* signal, Uint32, Uint32);

  void drop_table_log_buffer_callback(Signal*, Uint32, Uint32);
  void drop_table_logsync_callback(Signal*, Uint32, Uint32);

  void disk_page_set_dirty(Ptr<Page>);
  void restart_setup_page(Disk_alloc_info&, Ptr<Page>);
  void update_extent_pos(Disk_alloc_info&, Ptr<Extent_info>);
@@ -2679,7 +2714,7 @@ public:
  
private:
  void disk_restart_undo_next(Signal*);
  void disk_restart_undo_lcp(Uint32, Uint32);
  void disk_restart_undo_lcp(Uint32, Uint32, Uint32 flag);
  void disk_restart_undo_callback(Signal* signal, Uint32, Uint32);
  void disk_restart_undo_alloc(Apply_undo*);
  void disk_restart_undo_update(Apply_undo*);
+144 −12
Original line number Diff line number Diff line
@@ -68,6 +68,26 @@ operator<<(NdbOut& out, const Ptr<Dbtup::Extent_info> & ptr)
  return out;
}

#if NOT_YET_FREE_EXTENT
static
inline
bool
check_free(const Dbtup::Extent_info* extP)
{
  Uint32 res = 0;
  for (Uint32 i = 1; i<MAX_FREE_LIST; i++)
    res += extP->m_free_page_count[i];
  return res;
}
#error "Code for deallocting extents when they get empty"
#error "This code is not yet complete"
#endif

#if NOT_YET_UNDO_ALLOC_EXTENT
#error "This is needed for deallocting extents when they get empty"
#error "This code is not complete yet"
#endif

void 
Dbtup::dump_disk_alloc(Dbtup::Disk_alloc_info & alloc)
{
@@ -441,10 +461,26 @@ Dbtup::disk_page_prealloc(Signal* signal,
      /**
       * We need to alloc an extent
       */
#if NOT_YET_UNDO_ALLOC_EXTENT
      Uint32 logfile_group_id = fragPtr.p->m_logfile_group_id;

      err = c_lgman->alloc_log_space(logfile_group_id,
				     sizeof(Disk_undo::AllocExtent)>>2);
      if(unlikely(err))
      {
	return -err;
      }
#endif

      if (!c_extent_pool.seize(ext))
      {
	jam();
	//XXX
	err= 2;
#if NOT_YET_UNDO_ALLOC_EXTENT
	c_lgman->free_log_space(logfile_group_id, 
				sizeof(Disk_undo::AllocExtent)>>2);
#endif
	c_page_request_pool.release(req);
	ndbout_c("no free extent info");
	return -err;
@@ -452,12 +488,44 @@ Dbtup::disk_page_prealloc(Signal* signal,
      
      if ((err= tsman.alloc_extent(&ext.p->m_key)) < 0)
      {
	jam();
#if NOT_YET_UNDO_ALLOC_EXTENT
	c_lgman->free_log_space(logfile_group_id, 
				sizeof(Disk_undo::AllocExtent)>>2);
#endif
	c_extent_pool.release(ext);
	c_page_request_pool.release(req);
	return err;
      }

      int pages= err;
#if NOT_YET_UNDO_ALLOC_EXTENT
      {
	/**
	 * Do something here
	 */
	{
	  Callback cb;
	  cb.m_callbackData= ext.i;
	  cb.m_callbackFunction = 
	    safe_cast(&Dbtup::disk_page_alloc_extent_log_buffer_callback);
	  Uint32 sz= sizeof(Disk_undo::AllocExtent)>>2;
	  
	  Logfile_client lgman(this, c_lgman, logfile_group_id);
	  int res= lgman.get_log_buffer(signal, sz, &cb);
	  switch(res){
	  case 0:
	    break;
	  case -1:
	    ndbrequire("NOT YET IMPLEMENTED" == 0);
	    break;
	  default:
	    execute(signal, cb, res);	    
	  }
	}
      }
#endif
      
      ndbout << "allocated " << pages << " pages: " << ext.p->m_key << endl;
      ext.p->m_first_page_no = ext.p->m_key.m_page_no;
      bzero(ext.p->m_free_page_count, sizeof(ext.p->m_free_page_count));
@@ -1007,6 +1075,12 @@ Dbtup::disk_page_free(Signal *signal,
  
  extentPtr.p->m_free_space += sz;
  update_extent_pos(alloc, extentPtr);
#if NOT_YET_FREE_EXTENT
  if (check_free(extentPtr.p) == 0)
  {
    ndbout_c("free: extent is free");
  }
#endif
}

void
@@ -1104,13 +1178,55 @@ Dbtup::disk_page_abort_prealloc_callback_1(Signal* signal,

  extentPtr.p->m_free_space += sz;
  update_extent_pos(alloc, extentPtr);
#if NOT_YET_FREE_EXTENT
  if (check_free(extentPtr.p) == 0)
  {
    ndbout_c("abort: extent is free");
  }
#endif
}

#if NOT_YET_UNDO_ALLOC_EXTENT
void
Dbtup::disk_page_alloc_extent_log_buffer_callback(Signal* signal,
						  Uint32 extentPtrI,
						  Uint32 unused)
{
  Ptr<Extent_info> extentPtr;
  c_extent_pool.getPtr(extentPtr, extentPtrI);

  Local_key key = extentPtr.p->m_key;
  Tablespace_client2 tsman(signal, c_tsman, &key);

  Ptr<Tablerec> tabPtr;
  tabPtr.i= tsman.m_table_id;
  ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
  
  Ptr<Fragrecord> fragPtr;
  getFragmentrec(fragPtr, tsman.m_fragment_id, tabPtr.p);

  Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);

  Disk_undo::AllocExtent alloc;
  alloc.m_table = tabPtr.i;
  alloc.m_fragment = tsman.m_fragment_id;
  alloc.m_page_no = key.m_page_no;
  alloc.m_file_no = key.m_file_no;
  alloc.m_type_length = (Disk_undo::UNDO_ALLOC_EXTENT<<16)|(sizeof(alloc)>> 2);
  
  Logfile_client::Change c[1] = {{ &alloc, sizeof(alloc) >> 2 } };
  
  Uint64 lsn= lgman.add_entry(c, 1);
  
  tsman.update_lsn(&key, lsn);
}
#endif

Uint64
Dbtup::disk_page_undo_alloc(Page* page, const Local_key* key,
			    Uint32 sz, Uint32 gci, Uint32 logfile_group_id)
{
  Logfile_client lsman(this, c_lgman, logfile_group_id);
  Logfile_client lgman(this, c_lgman, logfile_group_id);
  
  Disk_undo::Alloc alloc;
  alloc.m_type_length= (Disk_undo::UNDO_ALLOC << 16) | (sizeof(alloc) >> 2);
@@ -1119,7 +1235,7 @@ Dbtup::disk_page_undo_alloc(Page* page, const Local_key* key,
  
  Logfile_client::Change c[1] = {{ &alloc, sizeof(alloc) >> 2 } };
  
  Uint64 lsn= lsman.add_entry(c, 1);
  Uint64 lsn= lgman.add_entry(c, 1);
  m_pgman.update_lsn(* key, lsn);

  return lsn;
@@ -1130,7 +1246,7 @@ Dbtup::disk_page_undo_update(Page* page, const Local_key* key,
			     const Uint32* src, Uint32 sz,
			     Uint32 gci, Uint32 logfile_group_id)
{
  Logfile_client lsman(this, c_lgman, logfile_group_id);
  Logfile_client lgman(this, c_lgman, logfile_group_id);

  Disk_undo::Update update;
  update.m_page_no = key->m_page_no;
@@ -1148,7 +1264,7 @@ Dbtup::disk_page_undo_update(Page* page, const Local_key* key,

  ndbassert(4*(3 + sz + 1) == (sizeof(update) + 4*sz - 4));
    
  Uint64 lsn= lsman.add_entry(c, 3);
  Uint64 lsn= lgman.add_entry(c, 3);
  m_pgman.update_lsn(* key, lsn);

  return lsn;
@@ -1159,7 +1275,7 @@ Dbtup::disk_page_undo_free(Page* page, const Local_key* key,
			   const Uint32* src, Uint32 sz,
			   Uint32 gci, Uint32 logfile_group_id)
{
  Logfile_client lsman(this, c_lgman, logfile_group_id);
  Logfile_client lgman(this, c_lgman, logfile_group_id);

  Disk_undo::Free free;
  free.m_page_no = key->m_page_no;
@@ -1177,7 +1293,7 @@ Dbtup::disk_page_undo_free(Page* page, const Local_key* key,
  
  ndbassert(4*(3 + sz + 1) == (sizeof(free) + 4*sz - 4));
  
  Uint64 lsn= lsman.add_entry(c, 3);
  Uint64 lsn= lgman.add_entry(c, 3);
  m_pgman.update_lsn(* key, lsn);

  return lsn;
@@ -1207,7 +1323,7 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
    ndbrequire(len == 3);
    Uint32 tableId = ptr[1] >> 16;
    Uint32 fragId = ptr[1] & 0xFFFF;
    disk_restart_undo_lcp(tableId, fragId);
    disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_LCP);
    disk_restart_undo_next(signal);
    return;
  }
@@ -1246,10 +1362,20 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
    ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
    for(Uint32 i = 0; i<MAX_FRAG_PER_NODE; i++)
      if (tabPtr.p->fragrec[i] != RNIL)
	disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i]);
	disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i], 
			      Fragrecord::UC_CREATE);
    disk_restart_undo_next(signal);
    return;
  }
  case File_formats::Undofile::UNDO_TUP_DROP:
    jam();
  case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
    jam();
  case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
    jam();
    disk_restart_undo_next(signal);
    return;

  case File_formats::Undofile::UNDO_END:
    f_undo_done = true;
    return;
@@ -1283,7 +1409,7 @@ Dbtup::disk_restart_undo_next(Signal* signal)
}

void
Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId)
Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId, Uint32 flag)
{
  Ptr<Tablerec> tabPtr;
  tabPtr.i= tableId;
@@ -1295,7 +1421,7 @@ Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId)
    getFragmentrec(fragPtr, fragId, tabPtr.p);
    if (!fragPtr.isNull())
    {
      fragPtr.p->m_undo_complete = true;
      fragPtr.p->m_undo_complete |= flag;
    }
  }
}
@@ -1502,6 +1628,12 @@ Dbtup::disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId,
  if (tabPtr.p->tableStatus == DEFINED)
  {
    getFragmentrec(fragPtr, fragId, tabPtr.p);
    if (fragPtr.p->m_undo_complete & Fragrecord::UC_CREATE)
    {
      jam();
      return -1;
    }

    if (!fragPtr.isNull())
    {
      Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
+3 −3
Original line number Diff line number Diff line
@@ -3129,7 +3129,7 @@ Dbtup::nr_delete(Signal* signal, Uint32 senderData,
    disk_page_set_dirty(disk_page);

    preq.m_callback.m_callbackFunction =
      safe_cast(&Dbtup::nr_delete_logbuffer_callback);      
      safe_cast(&Dbtup::nr_delete_log_buffer_callback);      
    Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
    res= lgman.get_log_buffer(signal, sz, &preq.m_callback);
    switch(res){
@@ -3182,7 +3182,7 @@ Dbtup::nr_delete_page_callback(Signal* signal,
  Callback cb;
  cb.m_callbackData = userpointer;
  cb.m_callbackFunction =
    safe_cast(&Dbtup::nr_delete_logbuffer_callback);      
    safe_cast(&Dbtup::nr_delete_log_buffer_callback);      
  Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
  int res= lgman.get_log_buffer(signal, sz, &cb);
  switch(res){
@@ -3202,7 +3202,7 @@ Dbtup::nr_delete_page_callback(Signal* signal,
}

void
Dbtup::nr_delete_logbuffer_callback(Signal* signal, 
Dbtup::nr_delete_log_buffer_callback(Signal* signal, 
				    Uint32 userpointer, 
				    Uint32 unused)
{
Loading