Commit 47a24555 authored by tomas@whalegate.ndb.mysql.com's avatar tomas@whalegate.ndb.mysql.com
Browse files

break out tuple data read

parent bfc80ede
Loading
Loading
Loading
Loading
+15 −10
Original line number Diff line number Diff line
@@ -241,19 +241,22 @@ static void dbug_print_table(const char *info, TABLE *table)
static void run_query(THD *thd, char *buf, char *end,
                      const int *no_print_error, my_bool disable_binlog)
{
  ulong save_query_length= thd->query_length;
  char *save_query= thd->query;
  struct system_variables save_variables= thd->variables;
  struct system_status_var save_status_var= thd->status_var;
  ulong save_thd_query_length= thd->query_length;
  char *save_thd_query= thd->query;
  struct system_variables save_thd_variables= thd->variables;
  struct system_status_var save_thd_status_var= thd->status_var;
  THD_TRANS save_thd_transaction_all= thd->transaction.all;
  THD_TRANS save_thd_transaction_stmt= thd->transaction.stmt;
  ulonglong save_thd_options= thd->options;
  DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->options));
  NET save_net= thd->net;
  NET save_thd_net= thd->net;
  const char* found_semicolon= NULL;

  bzero((char*) &thd->net, sizeof(NET));
  thd->query_length= end - buf;
  thd->query= buf;
  thd->variables.pseudo_thread_id= thread_id;
  thd->transaction.stmt.modified_non_trans_table= FALSE;
  if (disable_binlog)
    thd->options&= ~OPTION_BIN_LOG;
    
@@ -276,11 +279,13 @@ static void run_query(THD *thd, char *buf, char *end,
  }

  thd->options= save_thd_options;
  thd->query_length= save_query_length;
  thd->query= save_query;
  thd->variables= save_variables;
  thd->status_var= save_status_var;
  thd->net= save_net;
  thd->query_length= save_thd_query_length;
  thd->query= save_thd_query;
  thd->variables= save_thd_variables;
  thd->status_var= save_thd_status_var;
  thd->transaction.all= save_thd_transaction_all;
  thd->transaction.stmt= save_thd_transaction_stmt;
  thd->net= save_thd_net;

  if (thd == injector_thd)
  {
+84 −72
Original line number Diff line number Diff line
@@ -534,6 +534,88 @@ TupleS::prepareRecord(TableS & tab){
  return true;
}

int
RestoreDataIterator::readTupleData(Uint32 *buf_ptr, Uint32 *ptr,
                                   Uint32 dataLength)
{
  while (ptr + 2 < buf_ptr + dataLength)
  {
    typedef BackupFormat::DataFile::VariableData VarData;
    VarData * data = (VarData *)ptr;
    Uint32 sz = ntohl(data->Sz);
    Uint32 attrId = ntohl(data->Id); // column_no

    AttributeData * attr_data = m_tuple.getData(attrId);
    const AttributeDesc * attr_desc = m_tuple.getDesc(attrId);
    
    // just a reminder - remove when backwards compat implemented
    if (m_currentTable->backupVersion < MAKE_VERSION(5,1,3) && 
        attr_desc->m_column->getNullable())
    {
      const Uint32 ind = attr_desc->m_nullBitIndex;
      if(BitmaskImpl::get(m_currentTable->m_nullBitmaskSize, 
                          buf_ptr,ind))
      {
        attr_data->null = true;
        attr_data->void_value = NULL;
        continue;
      }
    }

    if (m_currentTable->backupVersion < MAKE_VERSION(5,1,3))
    {
      sz *= 4;
    }
    
    attr_data->null = false;
    attr_data->void_value = &data->Data[0];
    attr_data->size = sz;

    //if (m_currentTable->getTableId() >= 2) { ndbout << "var off=" << ptr-buf_ptr << " attrId=" << attrId << endl; }

    /**
     * Compute array size
     */
    const Uint32 arraySize = sz / (attr_desc->size / 8);
    assert(arraySize <= attr_desc->arraySize);

    //convert the length of blob(v1) and text(v1)
    if(!m_hostByteOrder
        && (attr_desc->m_column->getType() == NdbDictionary::Column::Blob
           || attr_desc->m_column->getType() == NdbDictionary::Column::Text)
        && attr_desc->m_column->getArrayType() == NdbDictionary::Column::ArrayTypeFixed)
    {
      char* p = (char*)&attr_data->u_int64_value[0];
      Uint64 x;
      memcpy(&x, p, sizeof(Uint64));
      x = Twiddle64(x);
      memcpy(p, &x, sizeof(Uint64));
    }

    //convert datetime type
    if(!m_hostByteOrder
        && attr_desc->m_column->getType() == NdbDictionary::Column::Datetime)
    {
      char* p = (char*)&attr_data->u_int64_value[0];
      Uint64 x;
      memcpy(&x, p, sizeof(Uint64));
      x = Twiddle64(x);
      memcpy(p, &x, sizeof(Uint64));
    }

    if(!Twiddle(attr_desc, attr_data, attr_desc->arraySize))
    {
      return -1;
    }
    
    ptr += ((sz + 3) >> 2) + 2;
  }

  assert(ptr == buf_ptr + dataLength);

  return 0;
}

const TupleS *
RestoreDataIterator::getNextTuple(int  & res)
{
@@ -630,78 +712,8 @@ RestoreDataIterator::getNextTuple(int & res)
    attr_data->void_value = NULL;
  }

  while (ptr + 2 < buf_ptr + dataLength) {
    typedef BackupFormat::DataFile::VariableData VarData;
    VarData * data = (VarData *)ptr;
    Uint32 sz = ntohl(data->Sz);
    Uint32 attrId = ntohl(data->Id); // column_no

    AttributeData * attr_data = m_tuple.getData(attrId);
    const AttributeDesc * attr_desc = m_tuple.getDesc(attrId);
    
    // just a reminder - remove when backwards compat implemented
    if(m_currentTable->backupVersion < MAKE_VERSION(5,1,3) && 
       attr_desc->m_column->getNullable()){
      const Uint32 ind = attr_desc->m_nullBitIndex;
      if(BitmaskImpl::get(m_currentTable->m_nullBitmaskSize, 
			  buf_ptr,ind)){
	attr_data->null = true;
	attr_data->void_value = NULL;
	continue;
      }
    }

    if (m_currentTable->backupVersion < MAKE_VERSION(5,1,3))
    {
      sz *= 4;
    }
    
    attr_data->null = false;
    attr_data->void_value = &data->Data[0];
    attr_data->size = sz;

    //if (m_currentTable->getTableId() >= 2) { ndbout << "var off=" << ptr-buf_ptr << " attrId=" << attrId << endl; }

    /**
     * Compute array size
     */
    const Uint32 arraySize = sz / (attr_desc->size / 8);
    assert(arraySize <= attr_desc->arraySize);

    //convert the length of blob(v1) and text(v1)
    if(!m_hostByteOrder
        && (attr_desc->m_column->getType() == NdbDictionary::Column::Blob
           || attr_desc->m_column->getType() == NdbDictionary::Column::Text)
        && attr_desc->m_column->getArrayType() == NdbDictionary::Column::ArrayTypeFixed)
    {
      char* p = (char*)&attr_data->u_int64_value[0];
      Uint64 x;
      memcpy(&x, p, sizeof(Uint64));
      x = Twiddle64(x);
      memcpy(p, &x, sizeof(Uint64));
    }

    //convert datetime type
    if(!m_hostByteOrder
        && attr_desc->m_column->getType() == NdbDictionary::Column::Datetime)
    {
      char* p = (char*)&attr_data->u_int64_value[0];
      Uint64 x;
      memcpy(&x, p, sizeof(Uint64));
      x = Twiddle64(x);
      memcpy(p, &x, sizeof(Uint64));
    }

    if(!Twiddle(attr_desc, attr_data, attr_desc->arraySize))
      {
	res = -1;
  if ((res = readTupleData(buf_ptr, ptr, dataLength)))
    return NULL;
      }
    
    ptr += ((sz + 3) >> 2) + 2;
  }

  assert(ptr == buf_ptr + dataLength);

  m_count ++;  
  res = 0;
+4 −0
Original line number Diff line number Diff line
@@ -355,6 +355,10 @@ public:
  bool validateFragmentFooter();

  const TupleS *getNextTuple(int & res);

private:

  int readTupleData(Uint32 *buf_ptr, Uint32 *ptr, Uint32 dataLength);
};

class LogEntry {