Commit 8e7d5f78 authored by unknown's avatar unknown
Browse files

bug#18594 ndb_restore log boken in 5.1

- corrected previous patch
- read log entry variables out into explicit variables for siimpler code (and backwards compatability code)

parent 925a9dcf
Loading
Loading
Loading
Loading
+39 −47
Original line number Diff line number Diff line
@@ -961,15 +961,15 @@ RestoreLogIterator::RestoreLogIterator(const RestoreMetaData & md)
const LogEntry *
RestoreLogIterator::getNextLogEntry(int & res) {
  // Read record length
  typedef BackupFormat::LogFile::LogEntry LogE;
  typedef BackupFormat::LogFile::LogEntry_no_fragid LogE_no_fragid;
  const Uint32 offset= 3;
  assert(offset == (offsetof(LogE, Data) >> 2) - 1);
  LogE * logE= 0;
  Uint32 len= ~0;
  const Uint32 stopGCP = m_metaData.getStopGCP();
  NdbAutoPtr<char> ap1;
  Uint32 tableId;
  Uint32 triggerEvent;
  Uint32 frag_id;
  Uint32 *attr_data;
  Uint32 attr_data_len;
  do {
    Uint32 len;
    Uint32 *logEntryPtr;
    if (buffer_read_ahead(&len, sizeof(Uint32), 1) != 1){
      res= -1;
      return 0;
@@ -977,7 +977,7 @@ RestoreLogIterator::getNextLogEntry(int & res) {
    len= ntohl(len);

    Uint32 data_len = sizeof(Uint32) + len*4;
    if (buffer_get_ptr((void **)(&logE), 1, data_len) != data_len) {
    if (buffer_get_ptr((void **)(&logEntryPtr), 1, data_len) != data_len) {
      res= -2;
      return 0;
    }
@@ -986,7 +986,8 @@ RestoreLogIterator::getNextLogEntry(int & res) {
      res= 0;
      return 0;
    }
    if (m_metaData.getFileHeader().NdbVersion < NDBD_FRAGID_VERSION)

    if (unlikely(m_metaData.getFileHeader().NdbVersion < NDBD_FRAGID_VERSION))
    {
      /*
        FragId was introduced in LogEntry in version
@@ -994,48 +995,39 @@ RestoreLogIterator::getNextLogEntry(int & res) {
        We set FragId to 0 in older versions (these versions
        do not support restore of user defined partitioned
        tables.

        These log entries miss one Uint32 FragId, hence missing_len=1

        Reconstruct a new log entry with old.
      */
      const Uint32 missing_len= 1;
      assert((offsetof(LogE, Data) - offsetof(LogE_no_fragid, Data)) >> 2 ==
             missing_len);
      LogE_no_fragid * logE_no_fragid= (LogE_no_fragid *)logE;
      
      int i;
      LogE *tmpLogE= (LogE*)NdbMem_Allocate(data_len + missing_len*4);
      if (!tmpLogE)
      typedef BackupFormat::LogFile::LogEntry_no_fragid LogE_no_fragid;
      LogE_no_fragid * logE_no_fragid= (LogE_no_fragid *)logEntryPtr;
      tableId= ntohl(logE_no_fragid->TableId);
      triggerEvent= ntohl(logE_no_fragid->TriggerEvent);
      frag_id= 0;
      attr_data= &logE_no_fragid->Data[0];
      attr_data_len= len - ((offsetof(LogE_no_fragid, Data) >> 2) - 1);
    }
    else /* normal case */
    {
        res = -2;
        return 0;
      }
      ap1.reset((char*)tmpLogE);
      bzero(tmpLogE, data_len + missing_len*4);
      /* correct len to reflect new logEntry version length */
      len+= missing_len;
      tmpLogE->Length = logE_no_fragid->Length;
      tmpLogE->TableId = logE_no_fragid->TableId;
      tmpLogE->TriggerEvent = logE_no_fragid->TriggerEvent;
      for (i = 0; i < len - offset; i++)
        tmpLogE->Data[i] = logE_no_fragid->Data[i];
      logE= tmpLogE;
      typedef BackupFormat::LogFile::LogEntry LogE;
      LogE * logE= (LogE *)logEntryPtr;
      tableId= ntohl(logE->TableId);
      triggerEvent= ntohl(logE->TriggerEvent);
      frag_id= ntohl(logE->FragId);
      attr_data= &logE->Data[0];
      attr_data_len= len - ((offsetof(LogE, Data) >> 2) - 1);
    }
    logE->TableId= ntohl(logE->TableId);
    logE->TriggerEvent= ntohl(logE->TriggerEvent);
    
    const bool hasGcp= (logE->TriggerEvent & 0x10000) != 0;
    logE->TriggerEvent &= 0xFFFF;
    const bool hasGcp= (triggerEvent & 0x10000) != 0;
    triggerEvent &= 0xFFFF;

    if(hasGcp){
      // last attr_data is gci info
      attr_data_len--;
      len--;
      m_last_gci = ntohl(logE->Data[len-offset]);
      m_last_gci = ntohl(*(attr_data + attr_data_len));
    }
  } while(m_last_gci > stopGCP + 1);

  m_logEntry.m_table = m_metaData.getTable(logE->TableId);
  switch(logE->TriggerEvent){
  m_logEntry.m_table = m_metaData.getTable(tableId);
  switch(triggerEvent){
  case TriggerEvent::TE_INSERT:
    m_logEntry.m_type = LogEntry::LE_INSERT;
    break;
@@ -1053,10 +1045,10 @@ RestoreLogIterator::getNextLogEntry(int & res) {
  const TableS * tab = m_logEntry.m_table;
  m_logEntry.clear();

  AttributeHeader * ah = (AttributeHeader *)&logE->Data[0];
  AttributeHeader *end = (AttributeHeader *)&logE->Data[len - offset];
  AttributeHeader * ah = (AttributeHeader *)attr_data;
  AttributeHeader *end = (AttributeHeader *)(attr_data + attr_data_len);
  AttributeS *  attr;
  m_logEntry.m_frag_id = ntohl(logE->FragId);
  m_logEntry.m_frag_id = frag_id;
  while(ah < end){
    attr= m_logEntry.add_attr();
    if(attr == NULL) {