Commit a8ea9cd1 authored by unknown's avatar unknown
Browse files

ndb dd -

  fix a SR bug (lcp impl. in pgman)


storage/ndb/src/kernel/blocks/pgman.cpp:
  move pgman lcp to after lgman has written LCP records for all fragments
  this also moves snapshot time for extent pages, which is ok as they will be
    undo:ed using normal ops.
  
  an alternative could be to let tup checkpoint it's dirty pages.
  this would lead to extent pages which are not beloning to a certain fragment
parent 59bd66d3
Loading
Loading
Loading
Loading
+11 −44
Original line number Diff line number Diff line
@@ -1135,8 +1135,6 @@ Pgman::process_lcp_prepare(Signal* signal, Ptr<Page_entry> ptr)
  ndbrequire(m_global_page_pool.seize(copy));
  ptr.p->m_copy_page_i = copy.i;
  
  DBG_LCP("assigning copy page to " << ptr << endl);
  
  signal->theData[0] = PgmanContinueB::LCP_PREPARE;
  signal->theData[1] = ptr.i;
  sendSignal(PGMAN_REF, GSN_CONTINUEB, signal, 2, JBB);
@@ -1145,11 +1143,8 @@ Pgman::process_lcp_prepare(Signal* signal, Ptr<Page_entry> ptr)
int
Pgman::create_copy_page(Ptr<Page_entry> ptr, Uint32 req_flags)
{
  DBG_LCP(<< ptr << " create_copy_page ");
  
  if (! (req_flags & DIRTY_FLAGS) && ! (ptr.p->m_state & Page_entry::COPY))
  {
    DBG_LCP(" return original" << endl);
    return ptr.p->m_real_page_i;
  }
  if (! (ptr.p->m_state & Page_entry::COPY))
@@ -1161,21 +1156,16 @@ Pgman::create_copy_page(Ptr<Page_entry> ptr, Uint32 req_flags)
    m_global_page_pool.getPtr(src, ptr.p->m_real_page_i);
    m_global_page_pool.getPtr(copy, ptr.p->m_copy_page_i);
    memcpy(copy.p, src.p, sizeof(GlobalPage));
    DBG_LCP("making copy... ");
  }
  DBG_LCP("return " << ptr.p->m_copy_page_i);
  return ptr.p->m_copy_page_i;
}

void
Pgman::restore_copy_page(Ptr<Page_entry> ptr)
{
  DBG_LCP(ptr << " restore_copy_page");

  Uint32 copyPtrI = ptr.p->m_copy_page_i;
  if (ptr.p->m_state & Page_entry::COPY)
  {
    DBG_LCP(" copy back");
    Ptr<GlobalPage> src;
    Ptr<GlobalPage> copy;
    m_global_page_pool.getPtr(src, ptr.p->m_real_page_i);
@@ -1184,7 +1174,6 @@ Pgman::restore_copy_page(Ptr<Page_entry> ptr)
  }
  
  m_global_page_pool.release(copyPtrI);
  DBG_LCP(endl);
  
  ptr.p->m_state &= ~Page_entry::COPY;
  ptr.p->m_copy_page_i = RNIL;
@@ -1196,10 +1185,7 @@ Pgman::execLCP_FRAG_ORD(Signal* signal)
  LcpFragOrd* ord = (LcpFragOrd*)signal->getDataPtr();
  ndbrequire(ord->lcpId >= m_last_lcp_complete + 1 || m_last_lcp_complete == 0);
  m_last_lcp = ord->lcpId;
  DBG_LCP("execLCP_FRAG_ORD" << endl);

  ndbrequire(!m_lcp_outstanding);
  m_lcp_curr_bucket = 0;
  DBG_LCP("Pgman::execLCP_FRAG_ORD lcp: " << m_last_lcp << endl);
  
#ifdef VM_TRACE
  debugOut
@@ -1207,8 +1193,6 @@ Pgman::execLCP_FRAG_ORD(Signal* signal)
    << " this=" << m_last_lcp << " last_complete=" << m_last_lcp_complete
    << " bucket=" << m_lcp_curr_bucket << endl;
#endif
  
  do_lcp_loop(signal, true);
}

void
@@ -1219,6 +1203,9 @@ Pgman::execEND_LCP_REQ(Signal* signal)

  DBG_LCP("execEND_LCP_REQ" << endl);

  ndbrequire(!m_lcp_outstanding);
  m_lcp_curr_bucket = 0;
  
#ifdef VM_TRACE
  debugOut
    << "PGMAN: execEND_LCP_REQ"
@@ -1227,15 +1214,9 @@ Pgman::execEND_LCP_REQ(Signal* signal)
    << " outstanding=" << m_lcp_outstanding << endl;
#endif

  if (m_last_lcp == m_last_lcp_complete)
  {
    ndbrequire(! m_lcp_loop_on);
    signal->theData[0] = m_end_lcp_req.senderData;
    sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
    DBG_LCP("GSN_END_LCP_CONF" << endl);
  }

  m_last_lcp_complete = m_last_lcp;
  
  do_lcp_loop(signal, true);
}

bool
@@ -1257,9 +1238,6 @@ Pgman::process_lcp(Signal* signal)
  // start or re-start from beginning of current hash bucket
  if (m_lcp_curr_bucket != ~(Uint32)0)
  {
    DBG_LCP(" PROCESS LCP m_lcp_curr_bucket" 
	    << m_lcp_curr_bucket << endl);
    
    Page_hashlist::Iterator iter;
    pl_hash.next(m_lcp_curr_bucket, iter);
    Uint32 loop = 0;
@@ -1270,12 +1248,7 @@ Pgman::process_lcp(Signal* signal)
      Ptr<Page_entry>& ptr = iter.curr;
      Page_state state = ptr.p->m_state;
      
      DBG_LCP("LCP " 
	      << " m_lcp_outstanding: " << m_lcp_outstanding
	      << " max_count: " << max_count
	      << " loop: " << loop 
	      << " iter.curr.i: " << iter.curr.i
	      << " " << ptr);
      DBG_LCP("LCP " << ptr << " - ");
      
      if (ptr.p->m_last_lcp < m_last_lcp &&
          (state & Page_entry::DIRTY))
@@ -1323,19 +1296,13 @@ Pgman::process_lcp(Signal* signal)

  if (m_lcp_curr_bucket == ~(Uint32)0  && !m_lcp_outstanding)
  {
    if (m_last_lcp == m_last_lcp_complete)
    {
    DBG_LCP("GSN_END_LCP_CONF" << endl);
    signal->theData[0] = m_end_lcp_req.senderData;
    sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
      DBG_LCP("GSN_END_LCP_CONF" << endl);
    }
    DBG_LCP(" -- RETURN FALSE" << endl);
    m_last_lcp_complete = m_last_lcp;
    m_lcp_curr_bucket = ~(Uint32)0;
    return false;
  }

  DBG_LCP(" -- RETURN TRUE" << endl);
  return true;
}

@@ -1374,7 +1341,7 @@ Pgman::fsreadconf(Signal* signal, Ptr<Page_entry> ptr)
  ndbrequire(m_stats.m_current_io_waits > 0);
  m_stats.m_current_io_waits--;

  ptr.p->m_last_lcp = m_last_lcp;
  ptr.p->m_last_lcp = m_last_lcp_complete;
  do_busy_loop(signal, true);
}